Simplified distributed block storage with strong consistency, like in Ceph
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

416 lines
13 KiB

  1. // Copyright (c) Vitaliy Filippov, 2019+
  2. // License: VNPL-1.0 (see README.md for details)
  3. /**
  4. * Inode removal tool
  5. * May be included into a bigger "command-line management interface" in the future
  6. */
  7. #include <vector>
  8. #include <algorithm>
  9. #include "epoll_manager.h"
  10. #include "cluster_client.h"
  11. #include "pg_states.h"
  12. #define RM_LISTING 1
  13. #define RM_REMOVING 2
  14. #define RM_END 3
  15. const char *exe_name = NULL;
  16. struct rm_pg_t;
  17. struct rm_pg_osd_t
  18. {
  19. rm_pg_t *pg = NULL;
  20. osd_num_t osd_num;
  21. bool sent = false;
  22. };
  23. struct rm_pg_t
  24. {
  25. pg_num_t pg_num;
  26. osd_num_t rm_osd_num;
  27. std::vector<rm_pg_osd_t> list_osds;
  28. int state = 0;
  29. int to_list;
  30. std::set<object_id> objects;
  31. std::set<object_id>::iterator obj_pos;
  32. uint64_t obj_count = 0, obj_done = 0, obj_prev_done = 0;
  33. int in_flight = 0;
  34. };
  35. class rm_inode_t
  36. {
  37. protected:
  38. uint64_t inode = 0;
  39. pool_id_t pool_id = 0;
  40. uint64_t iodepth = 0, parallel_osds = 0;
  41. ring_loop_t *ringloop = NULL;
  42. epoll_manager_t *epmgr = NULL;
  43. cluster_client_t *cli = NULL;
  44. ring_consumer_t consumer;
  45. std::vector<rm_pg_t*> lists;
  46. uint64_t total_count = 0, total_done = 0, total_prev_pct = 0;
  47. uint64_t pgs_to_list = 0;
  48. bool started = false;
  49. bool progress = true;
  50. bool list_first = false;
  51. int log_level = 0;
  52. public:
  53. static json11::Json::object parse_args(int narg, const char *args[])
  54. {
  55. json11::Json::object cfg;
  56. cfg["progress"] = "1";
  57. for (int i = 1; i < narg; i++)
  58. {
  59. if (!strcmp(args[i], "-h") || !strcmp(args[i], "--help"))
  60. {
  61. help();
  62. }
  63. else if (args[i][0] == '-' && args[i][1] == '-')
  64. {
  65. const char *opt = args[i]+2;
  66. cfg[opt] = !strcmp(opt, "json") || !strcmp(opt, "wait-list") || i == narg-1 ? "1" : args[++i];
  67. }
  68. }
  69. return cfg;
  70. }
  71. static void help()
  72. {
  73. printf(
  74. "Vitastor inode removal tool\n"
  75. "(c) Vitaliy Filippov, 2020 (VNPL-1.0)\n\n"
  76. "USAGE:\n"
  77. " %s --etcd_address <etcd_address> --pool <pool> --inode <inode> [--wait-list]\n",
  78. exe_name
  79. );
  80. exit(0);
  81. }
  82. void run(json11::Json cfg)
  83. {
  84. if (cfg["etcd_address"].string_value() == "")
  85. {
  86. fprintf(stderr, "etcd_address is missing\n");
  87. exit(1);
  88. }
  89. inode = cfg["inode"].uint64_value();
  90. pool_id = cfg["pool"].uint64_value();
  91. if (pool_id)
  92. inode = (inode & ((1l << (64-POOL_ID_BITS)) - 1)) | (((uint64_t)pool_id) << (64-POOL_ID_BITS));
  93. pool_id = INODE_POOL(inode);
  94. if (!pool_id)
  95. {
  96. fprintf(stderr, "pool is missing");
  97. exit(1);
  98. }
  99. iodepth = cfg["iodepth"].uint64_value();
  100. if (!iodepth)
  101. iodepth = 32;
  102. parallel_osds = cfg["parallel_osds"].uint64_value();
  103. if (!parallel_osds)
  104. parallel_osds = 4;
  105. log_level = cfg["log_level"].int64_value();
  106. progress = cfg["progress"].uint64_value() ? true : false;
  107. list_first = cfg["wait-list"].uint64_value() ? true : false;
  108. // Create client
  109. ringloop = new ring_loop_t(512);
  110. epmgr = new epoll_manager_t(ringloop);
  111. cli = new cluster_client_t(ringloop, epmgr->tfd, cfg);
  112. cli->on_ready([this]() { start_delete(); });
  113. // Initialize job
  114. consumer.loop = [this]()
  115. {
  116. if (started)
  117. continue_delete();
  118. ringloop->submit();
  119. };
  120. ringloop->register_consumer(&consumer);
  121. // Loop until it completes
  122. while (1)
  123. {
  124. ringloop->loop();
  125. ringloop->wait();
  126. }
  127. }
  128. void start_delete()
  129. {
  130. if (cli->st_cli.pool_config.find(pool_id) == cli->st_cli.pool_config.end())
  131. {
  132. fprintf(stderr, "Pool %u does not exist\n", pool_id);
  133. exit(1);
  134. }
  135. auto pool_cfg = cli->st_cli.pool_config[pool_id];
  136. for (auto & pg_item: pool_cfg.pg_config)
  137. {
  138. auto & pg = pg_item.second;
  139. if (pg.pause || !pg.cur_primary || !(pg.cur_state & PG_ACTIVE))
  140. {
  141. fprintf(stderr, "PG %u is inactive, skipping\n", pg_item.first);
  142. continue;
  143. }
  144. rm_pg_t *r = new rm_pg_t();
  145. r->pg_num = pg_item.first;
  146. r->rm_osd_num = pg.cur_primary;
  147. r->state = RM_LISTING;
  148. if (pg.cur_state != PG_ACTIVE)
  149. {
  150. std::set<osd_num_t> all_peers;
  151. for (osd_num_t pg_osd: pg.target_set)
  152. {
  153. if (pg_osd != 0)
  154. {
  155. all_peers.insert(pg_osd);
  156. }
  157. }
  158. for (osd_num_t pg_osd: pg.all_peers)
  159. {
  160. if (pg_osd != 0)
  161. {
  162. all_peers.insert(pg_osd);
  163. }
  164. }
  165. for (auto & hist_item: pg.target_history)
  166. {
  167. for (auto pg_osd: hist_item)
  168. {
  169. if (pg_osd != 0)
  170. {
  171. all_peers.insert(pg_osd);
  172. }
  173. }
  174. }
  175. for (osd_num_t peer_osd: all_peers)
  176. {
  177. r->list_osds.push_back((rm_pg_osd_t){ .pg = r, .osd_num = peer_osd, .sent = false });
  178. }
  179. }
  180. else
  181. {
  182. r->list_osds.push_back((rm_pg_osd_t){ .pg = r, .osd_num = pg.cur_primary, .sent = false });
  183. }
  184. r->to_list = r->list_osds.size();
  185. lists.push_back(r);
  186. }
  187. std::sort(lists.begin(), lists.end(), [](rm_pg_t *a, rm_pg_t *b)
  188. {
  189. return a->rm_osd_num < b->rm_osd_num ? true : false;
  190. });
  191. pgs_to_list = lists.size();
  192. started = true;
  193. continue_delete();
  194. }
  195. void send_list(rm_pg_osd_t *cur_list)
  196. {
  197. if (cur_list->sent)
  198. {
  199. return;
  200. }
  201. if (cli->msgr.osd_peer_fds.find(cur_list->osd_num) ==
  202. cli->msgr.osd_peer_fds.end())
  203. {
  204. // Initiate connection
  205. cli->msgr.connect_peer(cur_list->osd_num, cli->st_cli.peer_states[cur_list->osd_num]);
  206. return;
  207. }
  208. osd_op_t *op = new osd_op_t();
  209. op->op_type = OSD_OP_OUT;
  210. op->peer_fd = cli->msgr.osd_peer_fds[cur_list->osd_num];
  211. op->req = (osd_any_op_t){
  212. .sec_list = {
  213. .header = {
  214. .magic = SECONDARY_OSD_OP_MAGIC,
  215. .id = cli->msgr.next_subop_id++,
  216. .opcode = OSD_OP_SEC_LIST,
  217. },
  218. .list_pg = cur_list->pg->pg_num,
  219. .pg_count = (pg_num_t)cli->st_cli.pool_config[pool_id].real_pg_count,
  220. .pg_stripe_size = cli->st_cli.pool_config[pool_id].pg_stripe_size,
  221. .min_inode = inode,
  222. .max_inode = inode,
  223. },
  224. };
  225. op->callback = [this, cur_list](osd_op_t *op)
  226. {
  227. cur_list->pg->to_list--;
  228. if (op->reply.hdr.retval < 0)
  229. {
  230. fprintf(stderr, "Failed to get PG %u/%u object list from OSD %lu (retval=%ld), skipping\n",
  231. pool_id, cur_list->pg->pg_num, cur_list->osd_num, op->reply.hdr.retval);
  232. }
  233. else
  234. {
  235. if (op->reply.sec_list.stable_count < op->reply.hdr.retval)
  236. {
  237. // Unstable objects, if present, mean that someone still writes into the inode. Warn the user about it.
  238. printf(
  239. "[PG %u/%u] Inode still has %lu unstable object versions - is it still open? Not a good idea to delete it.\n",
  240. pool_id, cur_list->pg->pg_num, op->reply.hdr.retval - op->reply.sec_list.stable_count
  241. );
  242. }
  243. if (log_level > 0)
  244. {
  245. printf(
  246. "[PG %u/%u] Got inode object list from OSD %lu: %ld object versions\n",
  247. pool_id, cur_list->pg->pg_num, cur_list->osd_num, op->reply.hdr.retval
  248. );
  249. }
  250. for (uint64_t i = 0; i < op->reply.hdr.retval; i++)
  251. {
  252. object_id oid = ((obj_ver_id*)op->buf)[i].oid;
  253. oid.stripe = oid.stripe & ~STRIPE_MASK;
  254. cur_list->pg->objects.insert(oid);
  255. }
  256. }
  257. delete op;
  258. if (cur_list->pg->to_list <= 0)
  259. {
  260. cur_list->pg->obj_done = cur_list->pg->obj_prev_done = 0;
  261. cur_list->pg->obj_pos = cur_list->pg->objects.begin();
  262. cur_list->pg->obj_count = cur_list->pg->objects.size();
  263. total_count += cur_list->pg->obj_count;
  264. total_prev_pct = 0;
  265. cur_list->pg->state = RM_REMOVING;
  266. pgs_to_list--;
  267. }
  268. continue_delete();
  269. };
  270. cli->msgr.outbox_push(op);
  271. cur_list->sent = true;
  272. }
  273. void send_ops(rm_pg_t *cur_list)
  274. {
  275. if (cli->msgr.osd_peer_fds.find(cur_list->rm_osd_num) ==
  276. cli->msgr.osd_peer_fds.end())
  277. {
  278. // Initiate connection
  279. cli->msgr.connect_peer(cur_list->rm_osd_num, cli->st_cli.peer_states[cur_list->rm_osd_num]);
  280. return;
  281. }
  282. while (cur_list->in_flight < iodepth && cur_list->obj_pos != cur_list->objects.end())
  283. {
  284. osd_op_t *op = new osd_op_t();
  285. op->op_type = OSD_OP_OUT;
  286. op->peer_fd = cli->msgr.osd_peer_fds[cur_list->rm_osd_num];
  287. op->req = (osd_any_op_t){
  288. .rw = {
  289. .header = {
  290. .magic = SECONDARY_OSD_OP_MAGIC,
  291. .id = cli->msgr.next_subop_id++,
  292. .opcode = OSD_OP_DELETE,
  293. },
  294. .inode = cur_list->obj_pos->inode,
  295. .offset = (cur_list->obj_pos->stripe & ~STRIPE_MASK),
  296. .len = 0,
  297. },
  298. };
  299. op->callback = [this, cur_list](osd_op_t *op)
  300. {
  301. cur_list->in_flight--;
  302. if (op->reply.hdr.retval < 0)
  303. {
  304. fprintf(stderr, "Failed to remove object from PG %u (OSD %lu) (retval=%ld)\n",
  305. cur_list->pg_num, cur_list->rm_osd_num, op->reply.hdr.retval);
  306. }
  307. delete op;
  308. cur_list->obj_done++;
  309. total_done++;
  310. continue_delete();
  311. };
  312. cli->msgr.outbox_push(op);
  313. cur_list->obj_pos++;
  314. cur_list->in_flight++;
  315. }
  316. if (!cur_list->in_flight && cur_list->obj_pos == cur_list->objects.end())
  317. {
  318. cur_list->obj_count = 0;
  319. cur_list->obj_done = cur_list->obj_prev_done = 0;
  320. cur_list->state = RM_END;
  321. }
  322. }
  323. void continue_delete()
  324. {
  325. int par_osd = 0;
  326. osd_num_t max_seen_osd = 0;
  327. bool no_del = false;
  328. if (list_first)
  329. {
  330. int i, n = 0;
  331. for (i = 0; i < lists.size(); i++)
  332. {
  333. if (lists[i]->state == RM_LISTING)
  334. {
  335. n++;
  336. }
  337. }
  338. if (n > 0)
  339. {
  340. no_del = true;
  341. }
  342. }
  343. for (int i = 0; i < lists.size(); i++)
  344. {
  345. if (lists[i]->state == RM_END)
  346. {
  347. delete lists[i];
  348. lists.erase(lists.begin()+i, lists.begin()+i+1);
  349. i--;
  350. }
  351. else if (lists[i]->rm_osd_num > max_seen_osd)
  352. {
  353. if (lists[i]->state == RM_LISTING)
  354. {
  355. for (int j = 0; j < lists[i]->list_osds.size(); j++)
  356. {
  357. send_list(&lists[i]->list_osds[j]);
  358. }
  359. }
  360. else if (lists[i]->state == RM_REMOVING)
  361. {
  362. if (no_del)
  363. {
  364. continue;
  365. }
  366. send_ops(lists[i]);
  367. }
  368. par_osd++;
  369. max_seen_osd = lists[i]->rm_osd_num;
  370. if (par_osd >= parallel_osds)
  371. {
  372. break;
  373. }
  374. }
  375. }
  376. if (progress && total_count > 0 && total_done*1000/total_count != total_prev_pct)
  377. {
  378. printf("\rRemoved %lu/%lu objects, %lu more PGs to list...", total_done, total_count, pgs_to_list);
  379. total_prev_pct = total_done*1000/total_count;
  380. }
  381. if (!lists.size())
  382. {
  383. printf("Done, inode %lu in pool %u removed\n", (inode & ((1l << (64-POOL_ID_BITS)) - 1)), pool_id);
  384. exit(0);
  385. }
  386. }
  387. };
  388. int main(int narg, const char *args[])
  389. {
  390. setvbuf(stdout, NULL, _IONBF, 0);
  391. setvbuf(stderr, NULL, _IONBF, 0);
  392. exe_name = args[0];
  393. rm_inode_t *p = new rm_inode_t();
  394. p->run(rm_inode_t::parse_args(narg, args));
  395. return 0;
  396. }