Browse Source

Watch inode metadata revisions

rdma-zerocopy
Vitaliy Filippov 4 months ago
parent
commit
6950b8e3a0
  1. 4
      src/cluster_client.cpp
  2. 2
      src/cluster_client.h
  3. 26
      src/etcd_state_client.cpp
  4. 12
      src/etcd_state_client.h
  5. 2
      src/osd.h
  6. 4
      src/osd_cluster.cpp
  7. 8
      src/test_cluster_client.cpp

4
src/cluster_client.cpp

@ -58,7 +58,7 @@ cluster_client_t::cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd
st_cli.tfd = tfd;
st_cli.on_load_config_hook = [this](json11::Json::object & cfg) { on_load_config_hook(cfg); };
st_cli.on_change_osd_state_hook = [this](uint64_t peer_osd) { on_change_osd_state_hook(peer_osd); };
st_cli.on_change_hook = [this](json11::Json::object & changes) { on_change_hook(changes); };
st_cli.on_change_hook = [this](std::map<std::string, etcd_kv_t> & changes) { on_change_hook(changes); };
st_cli.on_load_pgs_hook = [this](bool success) { on_load_pgs_hook(success); };
st_cli.parse_config(config);
@ -273,7 +273,7 @@ void cluster_client_t::on_load_pgs_hook(bool success)
continue_ops();
}
void cluster_client_t::on_change_hook(json11::Json::object & changes)
void cluster_client_t::on_change_hook(std::map<std::string, etcd_kv_t> & changes)
{
for (auto pool_item: st_cli.pool_config)
{

2
src/cluster_client.h

@ -108,7 +108,7 @@ protected:
void flush_buffer(const object_id & oid, cluster_buffer_t *wr);
void on_load_config_hook(json11::Json::object & config);
void on_load_pgs_hook(bool success);
void on_change_hook(json11::Json::object & changes);
void on_change_hook(std::map<std::string, etcd_kv_t> & changes);
void on_change_osd_state_hook(uint64_t peer_osd);
int continue_rw(cluster_op_t *op);
void slice_rw(cluster_op_t *op);

26
src/etcd_state_client.cpp

@ -27,9 +27,9 @@ etcd_state_client_t::~etcd_state_client_t()
}
#ifndef __MOCK__
json_kv_t etcd_state_client_t::parse_etcd_kv(const json11::Json & kv_json)
etcd_kv_t etcd_state_client_t::parse_etcd_kv(const json11::Json & kv_json)
{
json_kv_t kv;
etcd_kv_t kv;
kv.key = base64_decode(kv_json["key"].string_value());
std::string json_err, json_text = base64_decode(kv_json["value"].string_value());
kv.value = json_text == "" ? json11::Json() : json11::Json::parse(json_text, json_err);
@ -38,6 +38,8 @@ json_kv_t etcd_state_client_t::parse_etcd_kv(const json11::Json & kv_json)
printf("Bad JSON in etcd key %s: %s (value: %s)\n", kv.key.c_str(), json_err.c_str(), json_text.c_str());
kv.key = "";
}
else
kv.mod_revision = kv_json["mod_revision"].uint64_value();
return kv;
}
@ -150,22 +152,22 @@ void etcd_state_client_t::start_etcd_watcher()
etcd_watch_revision = data["result"]["header"]["revision"].uint64_value();
}
// First gather all changes into a hash to remove multiple overwrites
json11::Json::object changes;
std::map<std::string, etcd_kv_t> changes;
for (auto & ev: data["result"]["events"].array_items())
{
auto kv = parse_etcd_kv(ev["kv"]);
if (kv.key != "")
{
changes[kv.key] = kv.value;
changes[kv.key] = kv;
}
}
for (auto & kv: changes)
{
if (this->log_level > 3)
{
printf("Incoming event: %s -> %s\n", kv.first.c_str(), kv.second.dump().c_str());
printf("Incoming event: %s -> %s\n", kv.first.c_str(), kv.second.value.dump().c_str());
}
parse_state(kv.first, kv.second);
parse_state(kv.second);
}
// React to changes
if (on_change_hook != NULL)
@ -332,7 +334,7 @@ void etcd_state_client_t::load_pgs()
for (auto & kv_json: res["response_range"]["kvs"].array_items())
{
auto kv = parse_etcd_kv(kv_json);
parse_state(kv.key, kv.value);
parse_state(kv);
}
}
on_load_pgs_hook(true);
@ -355,13 +357,10 @@ void etcd_state_client_t::load_pgs()
}
#endif
void etcd_state_client_t::parse_state(const json_kv_t & kv)
{
parse_state(kv.key, kv.value);
}
void etcd_state_client_t::parse_state(const std::string & key, const json11::Json & value)
void etcd_state_client_t::parse_state(const etcd_kv_t & kv)
{
const std::string & key = kv.key;
const json11::Json & value = kv.value;
if (key == etcd_prefix+"/config/pools")
{
for (auto & pool_item: this->pool_config)
@ -712,6 +711,7 @@ void etcd_state_client_t::parse_state(const std::string & key, const json11::Jso
.size = value["size"].uint64_value(),
.parent_id = parent_inode_num,
.readonly = value["readonly"].bool_value(),
.mod_revision = kv.mod_revision,
};
this->inode_config[inode_num] = cfg;
if (cfg.name != "")

12
src/etcd_state_client.h

@ -18,10 +18,11 @@
#define DEFAULT_BLOCK_SIZE 128*1024
struct json_kv_t
struct etcd_kv_t
{
std::string key;
json11::Json value;
uint64_t mod_revision;
};
struct pg_config_t
@ -59,6 +60,8 @@ struct inode_config_t
uint64_t size;
inode_t parent_id;
bool readonly;
// Change revision of the metadata in etcd
uint64_t mod_revision;
};
struct inode_watch_t
@ -89,21 +92,20 @@ public:
std::map<inode_t, inode_config_t> inode_config;
std::map<std::string, inode_t> inode_by_name;
std::function<void(json11::Json::object &)> on_change_hook;
std::function<void(std::map<std::string, etcd_kv_t> &)> on_change_hook;
std::function<void(json11::Json::object &)> on_load_config_hook;
std::function<json11::Json()> load_pgs_checks_hook;
std::function<void(bool)> on_load_pgs_hook;
std::function<void(pool_id_t, pg_num_t)> on_change_pg_history_hook;
std::function<void(osd_num_t)> on_change_osd_state_hook;
json_kv_t parse_etcd_kv(const json11::Json & kv_json);
etcd_kv_t parse_etcd_kv(const json11::Json & kv_json);
void etcd_call(std::string api, json11::Json payload, int timeout, std::function<void(std::string, json11::Json)> callback);
void etcd_txn(json11::Json txn, int timeout, std::function<void(std::string, json11::Json)> callback);
void start_etcd_watcher();
void load_global_config();
void load_pgs();
void parse_state(const json_kv_t & kv);
void parse_state(const std::string & key, const json11::Json & value);
void parse_state(const etcd_kv_t & kv);
void parse_config(json11::Json & config);
inode_watch_t* watch_inode(std::string name);
void close_watch(inode_watch_t* watch);

2
src/osd.h

@ -147,7 +147,7 @@ class osd_t
void init_cluster();
void on_change_osd_state_hook(osd_num_t peer_osd);
void on_change_pg_history_hook(pool_id_t pool_id, pg_num_t pg_num);
void on_change_etcd_state_hook(json11::Json::object & changes);
void on_change_etcd_state_hook(std::map<std::string, etcd_kv_t> & changes);
void on_load_config_hook(json11::Json::object & changes);
json11::Json on_load_pgs_checks_hook();
void on_load_pgs_hook(bool success);

4
src/osd_cluster.cpp

@ -65,7 +65,7 @@ void osd_t::init_cluster()
st_cli.log_level = log_level;
st_cli.on_change_osd_state_hook = [this](osd_num_t peer_osd) { on_change_osd_state_hook(peer_osd); };
st_cli.on_change_pg_history_hook = [this](pool_id_t pool_id, pg_num_t pg_num) { on_change_pg_history_hook(pool_id, pg_num); };
st_cli.on_change_hook = [this](json11::Json::object & changes) { on_change_etcd_state_hook(changes); };
st_cli.on_change_hook = [this](std::map<std::string, etcd_kv_t> & changes) { on_change_etcd_state_hook(changes); };
st_cli.on_load_config_hook = [this](json11::Json::object & cfg) { on_load_config_hook(cfg); };
st_cli.load_pgs_checks_hook = [this]() { return on_load_pgs_checks_hook(); };
st_cli.on_load_pgs_hook = [this](bool success) { on_load_pgs_hook(success); };
@ -272,7 +272,7 @@ void osd_t::on_change_osd_state_hook(osd_num_t peer_osd)
}
}
void osd_t::on_change_etcd_state_hook(json11::Json::object & changes)
void osd_t::on_change_etcd_state_hook(std::map<std::string, etcd_kv_t> & changes)
{
// FIXME apply config changes in runtime (maybe, some)
if (run_primary)

8
src/test_cluster_client.cpp

@ -9,7 +9,7 @@
void configure_single_pg_pool(cluster_client_t *cli)
{
cli->st_cli.on_load_pgs_hook(true);
cli->st_cli.parse_state((json_kv_t){
cli->st_cli.parse_state((etcd_kv_t){
.key = "/config/pools",
.value = json11::Json::object {
{ "1", json11::Json::object {
@ -22,7 +22,7 @@ void configure_single_pg_pool(cluster_client_t *cli)
} }
},
});
cli->st_cli.parse_state((json_kv_t){
cli->st_cli.parse_state((etcd_kv_t){
.key = "/config/pgs",
.value = json11::Json::object {
{ "items", json11::Json::object {
@ -35,7 +35,7 @@ void configure_single_pg_pool(cluster_client_t *cli)
} }
},
});
cli->st_cli.parse_state((json_kv_t){
cli->st_cli.parse_state((etcd_kv_t){
.key = "/pg/state/1/1",
.value = json11::Json::object {
{ "peers", json11::Json::array { 1, 2 } },
@ -43,7 +43,7 @@ void configure_single_pg_pool(cluster_client_t *cli)
{ "state", json11::Json::array { "active" } },
},
});
json11::Json::object changes;
std::map<std::string, etcd_kv_t> changes;
cli->st_cli.on_change_hook(changes);
}

Loading…
Cancel
Save