Simplified distributed block storage with strong consistency, like in Ceph
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

451 lines
13 KiB

  1. // Copyright (c) Vitaliy Filippov, 2019+
  2. // License: VNPL-1.1 (see README.md for details)
  3. #include <unordered_map>
  4. #include "osd_peering_pg.h"
  5. struct obj_ver_role
  6. {
  7. object_id oid;
  8. uint64_t version;
  9. uint64_t osd_num;
  10. bool is_stable;
  11. };
  12. inline bool operator < (const obj_ver_role & a, const obj_ver_role & b)
  13. {
  14. // ORDER BY inode ASC, stripe & ~STRIPE_MASK ASC, version DESC, role ASC, osd_num ASC
  15. return a.oid.inode < b.oid.inode || a.oid.inode == b.oid.inode && (
  16. (a.oid.stripe & ~STRIPE_MASK) < (b.oid.stripe & ~STRIPE_MASK) ||
  17. (a.oid.stripe & ~STRIPE_MASK) == (b.oid.stripe & ~STRIPE_MASK) && (
  18. a.version > b.version ||
  19. a.version == b.version && (
  20. a.oid.stripe < b.oid.stripe ||
  21. a.oid.stripe == b.oid.stripe && a.osd_num < b.osd_num
  22. )
  23. )
  24. );
  25. }
  26. struct obj_piece_ver_t
  27. {
  28. uint64_t max_ver = 0;
  29. uint64_t stable_ver = 0;
  30. uint64_t max_target = 0;
  31. };
  32. struct pg_obj_state_check_t
  33. {
  34. pg_t *pg;
  35. bool replicated = false;
  36. std::vector<obj_ver_role> list;
  37. int list_pos;
  38. int obj_start = 0, obj_end = 0, ver_start = 0, ver_end = 0;
  39. object_id oid = { 0 };
  40. uint64_t max_ver = 0;
  41. uint64_t last_ver = 0;
  42. uint64_t target_ver = 0;
  43. uint64_t n_copies = 0, has_roles = 0, n_roles = 0, n_stable = 0, n_mismatched = 0;
  44. uint64_t n_unstable = 0, n_invalid = 0;
  45. pg_osd_set_t osd_set;
  46. int log_level;
  47. void walk();
  48. void start_object();
  49. void handle_version();
  50. void finish_object();
  51. };
  52. void pg_obj_state_check_t::walk()
  53. {
  54. pg->clean_count = 0;
  55. pg->total_count = 0;
  56. pg->state = 0;
  57. for (list_pos = 0; list_pos < list.size(); list_pos++)
  58. {
  59. if (oid.inode != list[list_pos].oid.inode ||
  60. oid.stripe != (list[list_pos].oid.stripe & ~STRIPE_MASK))
  61. {
  62. if (oid.inode != 0)
  63. {
  64. finish_object();
  65. }
  66. start_object();
  67. }
  68. handle_version();
  69. }
  70. if (oid.inode != 0)
  71. {
  72. finish_object();
  73. }
  74. if (pg->state & PG_HAS_INVALID)
  75. {
  76. // Stop PGs with "invalid" objects
  77. pg->state = PG_INCOMPLETE | PG_HAS_INVALID;
  78. return;
  79. }
  80. if (pg->pg_cursize < pg->pg_size)
  81. {
  82. pg->state |= PG_DEGRADED;
  83. }
  84. pg->state |= PG_ACTIVE;
  85. if (pg->state == PG_ACTIVE && pg->cur_peers.size() < pg->all_peers.size())
  86. {
  87. pg->state |= PG_LEFT_ON_DEAD;
  88. }
  89. }
  90. void pg_obj_state_check_t::start_object()
  91. {
  92. obj_start = list_pos;
  93. oid = { .inode = list[list_pos].oid.inode, .stripe = list[list_pos].oid.stripe & ~STRIPE_MASK };
  94. last_ver = max_ver = list[list_pos].version;
  95. target_ver = 0;
  96. ver_start = list_pos;
  97. has_roles = n_copies = n_roles = n_stable = n_mismatched = 0;
  98. n_unstable = n_invalid = 0;
  99. }
  100. void pg_obj_state_check_t::handle_version()
  101. {
  102. if (!target_ver && last_ver != list[list_pos].version && (n_stable > 0 || n_roles >= pg->pg_data_size))
  103. {
  104. // Version is either stable or recoverable
  105. target_ver = last_ver;
  106. ver_end = list_pos;
  107. }
  108. if (!target_ver)
  109. {
  110. if (last_ver != list[list_pos].version)
  111. {
  112. ver_start = list_pos;
  113. has_roles = n_copies = n_roles = n_stable = n_mismatched = 0;
  114. last_ver = list[list_pos].version;
  115. }
  116. unsigned replica = (list[list_pos].oid.stripe & STRIPE_MASK);
  117. n_copies++;
  118. if (replicated && replica > 0 || replica >= pg->pg_size)
  119. {
  120. n_invalid++;
  121. }
  122. else
  123. {
  124. if (list[list_pos].is_stable)
  125. {
  126. n_stable++;
  127. }
  128. if (replicated)
  129. {
  130. int i;
  131. for (i = 0; i < pg->cur_set.size(); i++)
  132. {
  133. if (pg->cur_set[i] == list[list_pos].osd_num)
  134. {
  135. break;
  136. }
  137. }
  138. if (i == pg->cur_set.size())
  139. {
  140. n_mismatched++;
  141. }
  142. }
  143. else
  144. {
  145. if (pg->cur_set[replica] != list[list_pos].osd_num)
  146. {
  147. n_mismatched++;
  148. }
  149. if (!(has_roles & (1 << replica)))
  150. {
  151. has_roles = has_roles | (1 << replica);
  152. n_roles++;
  153. }
  154. }
  155. }
  156. }
  157. if (!list[list_pos].is_stable)
  158. {
  159. n_unstable++;
  160. }
  161. }
  162. void pg_obj_state_check_t::finish_object()
  163. {
  164. if (!target_ver && (n_stable > 0 || n_roles >= pg->pg_data_size))
  165. {
  166. // Version is either stable or recoverable
  167. target_ver = last_ver;
  168. ver_end = list_pos;
  169. }
  170. obj_end = list_pos;
  171. // Remember the decision
  172. uint64_t state = 0;
  173. if (n_invalid > 0)
  174. {
  175. // It's not allowed to change the replication scheme for a pool other than by recreating it
  176. // So we must bring the PG offline
  177. state = OBJ_INCOMPLETE;
  178. pg->state |= PG_HAS_INVALID;
  179. pg->total_count++;
  180. return;
  181. }
  182. if (n_unstable > 0)
  183. {
  184. pg->state |= PG_HAS_UNCLEAN;
  185. std::unordered_map<obj_piece_id_t, obj_piece_ver_t> pieces;
  186. for (int i = obj_start; i < obj_end; i++)
  187. {
  188. auto & pcs = pieces[(obj_piece_id_t){ .oid = list[i].oid, .osd_num = list[i].osd_num }];
  189. if (!pcs.max_ver)
  190. {
  191. pcs.max_ver = list[i].version;
  192. }
  193. if (list[i].is_stable && !pcs.stable_ver)
  194. {
  195. pcs.stable_ver = list[i].version;
  196. }
  197. if (list[i].version <= target_ver && !pcs.max_target)
  198. {
  199. pcs.max_target = list[i].version;
  200. }
  201. }
  202. for (auto pp: pieces)
  203. {
  204. auto & pcs = pp.second;
  205. if (pcs.stable_ver < pcs.max_ver)
  206. {
  207. auto & act = pg->flush_actions[pp.first];
  208. // osd_set doesn't include rollback/stable states, so don't include them in the state code either
  209. if (pcs.max_ver > target_ver)
  210. {
  211. act.rollback = true;
  212. act.rollback_to = pcs.max_target;
  213. }
  214. if (pcs.stable_ver < (pcs.max_ver > target_ver ? pcs.max_target : pcs.max_ver))
  215. {
  216. act.make_stable = true;
  217. act.stable_to = pcs.max_ver > target_ver ? pcs.max_target : pcs.max_ver;
  218. }
  219. }
  220. }
  221. }
  222. if (!target_ver)
  223. {
  224. return;
  225. }
  226. if (!replicated && n_roles < pg->pg_data_size)
  227. {
  228. if (log_level > 1)
  229. {
  230. printf("Object is incomplete: %lx:%lx version=%lu/%lu\n", oid.inode, oid.stripe, target_ver, max_ver);
  231. }
  232. state = OBJ_INCOMPLETE;
  233. pg->state = pg->state | PG_HAS_INCOMPLETE;
  234. }
  235. else if ((replicated ? n_copies : n_roles) < pg->pg_cursize)
  236. {
  237. if (log_level > 1)
  238. {
  239. printf("Object is degraded: %lx:%lx version=%lu/%lu\n", oid.inode, oid.stripe, target_ver, max_ver);
  240. }
  241. state = OBJ_DEGRADED;
  242. pg->state = pg->state | PG_HAS_DEGRADED;
  243. }
  244. else if (n_mismatched > 0)
  245. {
  246. if (log_level > 2 && (replicated || n_roles >= pg->pg_cursize))
  247. {
  248. printf("Object is misplaced: %lx:%lx version=%lu/%lu\n", oid.inode, oid.stripe, target_ver, max_ver);
  249. }
  250. state |= OBJ_MISPLACED;
  251. pg->state = pg->state | PG_HAS_MISPLACED;
  252. }
  253. if (log_level > 1 && (state & (OBJ_INCOMPLETE | OBJ_DEGRADED)) ||
  254. log_level > 2 && (state & OBJ_MISPLACED))
  255. {
  256. for (int i = obj_start; i < obj_end; i++)
  257. {
  258. printf("v%lu present on: osd %lu, role %ld%s\n", list[i].version, list[i].osd_num,
  259. (list[i].oid.stripe & STRIPE_MASK), list[i].is_stable ? " (stable)" : "");
  260. }
  261. }
  262. pg->total_count++;
  263. if (state != 0 || ver_end < obj_end)
  264. {
  265. osd_set.clear();
  266. for (int i = ver_start; i < ver_end; i++)
  267. {
  268. osd_set.push_back((pg_obj_loc_t){
  269. .role = (list[i].oid.stripe & STRIPE_MASK),
  270. .osd_num = list[i].osd_num,
  271. .outdated = false,
  272. });
  273. }
  274. }
  275. if (ver_end < obj_end)
  276. {
  277. // Check for outdated versions not present in the current target OSD set
  278. for (int i = ver_end; i < obj_end; i++)
  279. {
  280. int j;
  281. for (j = 0; j < osd_set.size(); j++)
  282. {
  283. if (osd_set[j].osd_num == list[i].osd_num)
  284. {
  285. break;
  286. }
  287. }
  288. if (j >= osd_set.size() && pg->cur_set[list[i].oid.stripe & STRIPE_MASK] != list[i].osd_num)
  289. {
  290. osd_set.push_back((pg_obj_loc_t){
  291. .role = (list[i].oid.stripe & STRIPE_MASK),
  292. .osd_num = list[i].osd_num,
  293. .outdated = true,
  294. });
  295. if (!(state & (OBJ_INCOMPLETE | OBJ_DEGRADED)))
  296. {
  297. state |= OBJ_MISPLACED;
  298. pg->state = pg->state | PG_HAS_MISPLACED;
  299. }
  300. }
  301. }
  302. }
  303. if (target_ver < max_ver)
  304. {
  305. pg->ver_override[oid] = target_ver;
  306. }
  307. if (state == 0)
  308. {
  309. pg->clean_count++;
  310. }
  311. else
  312. {
  313. auto it = pg->state_dict.find(osd_set);
  314. if (it == pg->state_dict.end())
  315. {
  316. std::vector<uint64_t> read_target;
  317. if (replicated)
  318. {
  319. for (auto & o: osd_set)
  320. {
  321. if (!o.outdated)
  322. {
  323. read_target.push_back(o.osd_num);
  324. }
  325. }
  326. while (read_target.size() < pg->pg_size)
  327. {
  328. // FIXME: This is because we then use .data() and assume it's at least <pg_size> long
  329. read_target.push_back(0);
  330. }
  331. }
  332. else
  333. {
  334. read_target.resize(pg->pg_size);
  335. for (int i = 0; i < pg->pg_size; i++)
  336. {
  337. read_target[i] = 0;
  338. }
  339. for (auto & o: osd_set)
  340. {
  341. if (!o.outdated)
  342. {
  343. read_target[o.role] = o.osd_num;
  344. }
  345. }
  346. }
  347. pg->state_dict[osd_set] = {
  348. .read_target = read_target,
  349. .osd_set = osd_set,
  350. .state = state,
  351. .object_count = 1,
  352. };
  353. it = pg->state_dict.find(osd_set);
  354. }
  355. else
  356. {
  357. it->second.object_count++;
  358. }
  359. if (state & OBJ_INCOMPLETE)
  360. {
  361. pg->incomplete_objects[oid] = &it->second;
  362. }
  363. else if (state & OBJ_DEGRADED)
  364. {
  365. pg->degraded_objects[oid] = &it->second;
  366. }
  367. else
  368. {
  369. pg->misplaced_objects[oid] = &it->second;
  370. }
  371. }
  372. }
  373. // FIXME: Write at least some tests for this function
  374. void pg_t::calc_object_states(int log_level)
  375. {
  376. // Copy all object lists into one array
  377. pg_obj_state_check_t st;
  378. st.log_level = log_level;
  379. st.pg = this;
  380. st.replicated = (this->scheme == POOL_SCHEME_REPLICATED);
  381. auto ps = peering_state;
  382. epoch = 0;
  383. for (auto it: ps->list_results)
  384. {
  385. auto nstab = it.second.stable_count;
  386. auto n = it.second.total_count;
  387. auto osd_num = it.first;
  388. uint64_t start = st.list.size();
  389. st.list.resize(start + n);
  390. obj_ver_id *ov = it.second.buf;
  391. for (uint64_t i = 0; i < n; i++, ov++)
  392. {
  393. if ((ov->version >> (64-PG_EPOCH_BITS)) > epoch)
  394. {
  395. epoch = (ov->version >> (64-PG_EPOCH_BITS));
  396. }
  397. st.list[start+i] = {
  398. .oid = ov->oid,
  399. .version = ov->version,
  400. .osd_num = osd_num,
  401. .is_stable = i < nstab,
  402. };
  403. }
  404. free(it.second.buf);
  405. it.second.buf = NULL;
  406. }
  407. ps->list_results.clear();
  408. // Sort
  409. std::sort(st.list.begin(), st.list.end());
  410. // Walk over it and check object states
  411. st.walk();
  412. if (this->state & (PG_DEGRADED|PG_LEFT_ON_DEAD))
  413. {
  414. assert(epoch != ((1ul << PG_EPOCH_BITS)-1));
  415. epoch++;
  416. }
  417. }
  418. void pg_t::print_state()
  419. {
  420. printf(
  421. "[PG %u/%u] is %s%s%s%s%s%s%s%s%s%s%s%s%s%s (%lu objects)\n", pool_id, pg_num,
  422. (state & PG_STARTING) ? "starting" : "",
  423. (state & PG_OFFLINE) ? "offline" : "",
  424. (state & PG_PEERING) ? "peering" : "",
  425. (state & PG_INCOMPLETE) ? "incomplete" : "",
  426. (state & PG_ACTIVE) ? "active" : "",
  427. (state & PG_REPEERING) ? "repeering" : "",
  428. (state & PG_STOPPING) ? "stopping" : "",
  429. (state & PG_DEGRADED) ? " + degraded" : "",
  430. (state & PG_HAS_INCOMPLETE) ? " + has_incomplete" : "",
  431. (state & PG_HAS_DEGRADED) ? " + has_degraded" : "",
  432. (state & PG_HAS_MISPLACED) ? " + has_misplaced" : "",
  433. (state & PG_HAS_UNCLEAN) ? " + has_unclean" : "",
  434. (state & PG_HAS_INVALID) ? " + has_invalid" : "",
  435. (state & PG_LEFT_ON_DEAD) ? " + left_on_dead" : "",
  436. total_count
  437. );
  438. }