Simplified distributed block storage with strong consistency, like in Ceph
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

331 lines
11 KiB

  1. // Copyright (c) Vitaliy Filippov, 2019+
  2. // License: VNPL-1.1 (see README.md for details)
  3. #include "osd.h"
  4. #define FLUSH_BATCH 512
  5. void osd_t::submit_pg_flush_ops(pg_t & pg)
  6. {
  7. pg_flush_batch_t *fb = new pg_flush_batch_t();
  8. pg.flush_batch = fb;
  9. auto it = pg.flush_actions.begin(), prev_it = pg.flush_actions.begin();
  10. bool first = true;
  11. while (it != pg.flush_actions.end())
  12. {
  13. if (!first && (it->first.oid.inode != prev_it->first.oid.inode ||
  14. (it->first.oid.stripe & ~STRIPE_MASK) != (prev_it->first.oid.stripe & ~STRIPE_MASK)) &&
  15. fb->rollback_lists[it->first.osd_num].size() >= FLUSH_BATCH ||
  16. fb->stable_lists[it->first.osd_num].size() >= FLUSH_BATCH)
  17. {
  18. // Stop only at the object boundary
  19. break;
  20. }
  21. it->second.submitted = true;
  22. if (it->second.rollback)
  23. {
  24. fb->flush_objects++;
  25. fb->rollback_lists[it->first.osd_num].push_back((obj_ver_id){
  26. .oid = it->first.oid,
  27. .version = it->second.rollback_to,
  28. });
  29. }
  30. if (it->second.make_stable)
  31. {
  32. fb->flush_objects++;
  33. fb->stable_lists[it->first.osd_num].push_back((obj_ver_id){
  34. .oid = it->first.oid,
  35. .version = it->second.stable_to,
  36. });
  37. }
  38. prev_it = it;
  39. first = false;
  40. it++;
  41. }
  42. for (auto & l: fb->rollback_lists)
  43. {
  44. if (l.second.size() > 0)
  45. {
  46. fb->flush_ops++;
  47. submit_flush_op(pg.pool_id, pg.pg_num, fb, true, l.first, l.second.size(), l.second.data());
  48. }
  49. }
  50. for (auto & l: fb->stable_lists)
  51. {
  52. if (l.second.size() > 0)
  53. {
  54. fb->flush_ops++;
  55. submit_flush_op(pg.pool_id, pg.pg_num, fb, false, l.first, l.second.size(), l.second.data());
  56. }
  57. }
  58. }
  59. void osd_t::handle_flush_op(bool rollback, pool_id_t pool_id, pg_num_t pg_num, pg_flush_batch_t *fb, osd_num_t peer_osd, int retval)
  60. {
  61. pool_pg_num_t pg_id = { .pool_id = pool_id, .pg_num = pg_num };
  62. if (pgs.find(pg_id) == pgs.end() || pgs[pg_id].flush_batch != fb)
  63. {
  64. // Throw the result away
  65. return;
  66. }
  67. if (retval != 0)
  68. {
  69. if (peer_osd == this->osd_num)
  70. {
  71. throw std::runtime_error(
  72. std::string(rollback
  73. ? "Error while doing local rollback operation: "
  74. : "Error while doing local stabilize operation: "
  75. ) + strerror(-retval)
  76. );
  77. }
  78. else
  79. {
  80. printf("Error while doing flush on OSD %lu: %d (%s)\n", osd_num, retval, strerror(-retval));
  81. auto fd_it = msgr.osd_peer_fds.find(peer_osd);
  82. if (fd_it != msgr.osd_peer_fds.end())
  83. {
  84. msgr.stop_client(fd_it->second);
  85. }
  86. return;
  87. }
  88. }
  89. fb->flush_done++;
  90. if (fb->flush_done == fb->flush_ops)
  91. {
  92. // This flush batch is done
  93. std::vector<osd_op_t*> continue_ops;
  94. auto & pg = pgs.at(pg_id);
  95. auto it = pg.flush_actions.begin(), prev_it = it;
  96. auto erase_start = it;
  97. while (1)
  98. {
  99. if (it == pg.flush_actions.end() ||
  100. it->first.oid.inode != prev_it->first.oid.inode ||
  101. (it->first.oid.stripe & ~STRIPE_MASK) != (prev_it->first.oid.stripe & ~STRIPE_MASK))
  102. {
  103. pg.ver_override.erase((object_id){
  104. .inode = prev_it->first.oid.inode,
  105. .stripe = (prev_it->first.oid.stripe & ~STRIPE_MASK),
  106. });
  107. auto wr_it = pg.write_queue.find((object_id){
  108. .inode = prev_it->first.oid.inode,
  109. .stripe = (prev_it->first.oid.stripe & ~STRIPE_MASK),
  110. });
  111. if (wr_it != pg.write_queue.end())
  112. {
  113. continue_ops.push_back(wr_it->second);
  114. pg.write_queue.erase(wr_it);
  115. }
  116. }
  117. if ((it == pg.flush_actions.end() || !it->second.submitted) &&
  118. erase_start != it)
  119. {
  120. pg.flush_actions.erase(erase_start, it);
  121. }
  122. if (it == pg.flush_actions.end())
  123. {
  124. break;
  125. }
  126. prev_it = it;
  127. if (!it->second.submitted)
  128. {
  129. it++;
  130. erase_start = it;
  131. }
  132. else
  133. {
  134. it++;
  135. }
  136. }
  137. delete fb;
  138. pg.flush_batch = NULL;
  139. if (!pg.flush_actions.size())
  140. {
  141. pg.state = pg.state & ~PG_HAS_UNCLEAN;
  142. report_pg_state(pg);
  143. }
  144. for (osd_op_t *op: continue_ops)
  145. {
  146. continue_primary_write(op);
  147. }
  148. if ((pg.state & PG_STOPPING) && pg.inflight == 0 && !pg.flush_batch)
  149. {
  150. finish_stop_pg(pg);
  151. }
  152. else if ((pg.state & PG_REPEERING) && pg.inflight == 0 && !pg.flush_batch)
  153. {
  154. start_pg_peering(pg);
  155. }
  156. }
  157. }
  158. void osd_t::submit_flush_op(pool_id_t pool_id, pg_num_t pg_num, pg_flush_batch_t *fb, bool rollback, osd_num_t peer_osd, int count, obj_ver_id *data)
  159. {
  160. osd_op_t *op = new osd_op_t();
  161. // Copy buffer so it gets freed along with the operation
  162. op->buf = malloc_or_die(sizeof(obj_ver_id) * count);
  163. memcpy(op->buf, data, sizeof(obj_ver_id) * count);
  164. if (peer_osd == this->osd_num)
  165. {
  166. // local
  167. clock_gettime(CLOCK_REALTIME, &op->tv_begin);
  168. op->bs_op = new blockstore_op_t((blockstore_op_t){
  169. .opcode = (uint64_t)(rollback ? BS_OP_ROLLBACK : BS_OP_STABLE),
  170. .callback = [this, op, pool_id, pg_num, fb](blockstore_op_t *bs_op)
  171. {
  172. add_bs_subop_stats(op);
  173. handle_flush_op(bs_op->opcode == BS_OP_ROLLBACK, pool_id, pg_num, fb, this->osd_num, bs_op->retval);
  174. delete op->bs_op;
  175. op->bs_op = NULL;
  176. delete op;
  177. },
  178. .len = (uint32_t)count,
  179. .buf = op->buf,
  180. });
  181. bs->enqueue_op(op->bs_op);
  182. }
  183. else
  184. {
  185. // Peer
  186. int peer_fd = msgr.osd_peer_fds[peer_osd];
  187. op->op_type = OSD_OP_OUT;
  188. op->iov.push_back(op->buf, count * sizeof(obj_ver_id));
  189. op->peer_fd = peer_fd;
  190. op->req = (osd_any_op_t){
  191. .sec_stab = {
  192. .header = {
  193. .magic = SECONDARY_OSD_OP_MAGIC,
  194. .id = msgr.next_subop_id++,
  195. .opcode = (uint64_t)(rollback ? OSD_OP_SEC_ROLLBACK : OSD_OP_SEC_STABILIZE),
  196. },
  197. .len = count * sizeof(obj_ver_id),
  198. },
  199. };
  200. op->callback = [this, pool_id, pg_num, fb, peer_osd](osd_op_t *op)
  201. {
  202. handle_flush_op(op->req.hdr.opcode == OSD_OP_SEC_ROLLBACK, pool_id, pg_num, fb, peer_osd, op->reply.hdr.retval);
  203. delete op;
  204. };
  205. msgr.outbox_push(op);
  206. }
  207. }
  208. bool osd_t::pick_next_recovery(osd_recovery_op_t &op)
  209. {
  210. if (!no_recovery)
  211. {
  212. for (auto pg_it = pgs.begin(); pg_it != pgs.end(); pg_it++)
  213. {
  214. if ((pg_it->second.state & (PG_ACTIVE | PG_HAS_DEGRADED)) == (PG_ACTIVE | PG_HAS_DEGRADED))
  215. {
  216. for (auto obj_it = pg_it->second.degraded_objects.begin(); obj_it != pg_it->second.degraded_objects.end(); obj_it++)
  217. {
  218. if (recovery_ops.find(obj_it->first) == recovery_ops.end())
  219. {
  220. op.degraded = true;
  221. op.oid = obj_it->first;
  222. return true;
  223. }
  224. }
  225. }
  226. }
  227. }
  228. if (!no_rebalance)
  229. {
  230. for (auto pg_it = pgs.begin(); pg_it != pgs.end(); pg_it++)
  231. {
  232. // Don't try to "recover" misplaced objects if "recovery" would make them degraded
  233. if ((pg_it->second.state & (PG_ACTIVE | PG_DEGRADED | PG_HAS_MISPLACED)) == (PG_ACTIVE | PG_HAS_MISPLACED))
  234. {
  235. for (auto obj_it = pg_it->second.misplaced_objects.begin(); obj_it != pg_it->second.misplaced_objects.end(); obj_it++)
  236. {
  237. if (recovery_ops.find(obj_it->first) == recovery_ops.end())
  238. {
  239. op.degraded = false;
  240. op.oid = obj_it->first;
  241. return true;
  242. }
  243. }
  244. }
  245. }
  246. }
  247. return false;
  248. }
  249. void osd_t::submit_recovery_op(osd_recovery_op_t *op)
  250. {
  251. op->osd_op = new osd_op_t();
  252. op->osd_op->op_type = OSD_OP_OUT;
  253. op->osd_op->req = (osd_any_op_t){
  254. .rw = {
  255. .header = {
  256. .magic = SECONDARY_OSD_OP_MAGIC,
  257. .id = 1,
  258. .opcode = OSD_OP_WRITE,
  259. },
  260. .inode = op->oid.inode,
  261. .offset = op->oid.stripe,
  262. .len = 0,
  263. },
  264. };
  265. if (log_level > 2)
  266. {
  267. printf("Submitting recovery operation for %lx:%lx\n", op->oid.inode, op->oid.stripe);
  268. }
  269. op->osd_op->callback = [this, op](osd_op_t *osd_op)
  270. {
  271. if (osd_op->reply.hdr.retval < 0)
  272. {
  273. // Error recovering object
  274. if (osd_op->reply.hdr.retval == -EPIPE)
  275. {
  276. // PG is stopped or one of the OSDs is gone, error is harmless
  277. printf(
  278. "Recovery operation failed with object %lx:%lx (PG %u/%u)\n",
  279. op->oid.inode, op->oid.stripe, INODE_POOL(op->oid.inode),
  280. map_to_pg(op->oid, st_cli.pool_config.at(INODE_POOL(op->oid.inode)).pg_stripe_size)
  281. );
  282. }
  283. else
  284. {
  285. throw std::runtime_error("Failed to recover an object");
  286. }
  287. }
  288. // CAREFUL! op = &recovery_ops[op->oid]. Don't access op->* after recovery_ops.erase()
  289. op->osd_op = NULL;
  290. recovery_ops.erase(op->oid);
  291. delete osd_op;
  292. if (immediate_commit != IMMEDIATE_ALL)
  293. {
  294. recovery_done++;
  295. if (recovery_done >= recovery_sync_batch)
  296. {
  297. // Force sync every <recovery_sync_batch> operations
  298. // This is required not to pile up an excessive amount of delete operations
  299. autosync();
  300. recovery_done = 0;
  301. }
  302. }
  303. continue_recovery();
  304. };
  305. exec_op(op->osd_op);
  306. }
  307. // Just trigger write requests for degraded objects. They'll be recovered during writing
  308. bool osd_t::continue_recovery()
  309. {
  310. while (recovery_ops.size() < recovery_queue_depth)
  311. {
  312. osd_recovery_op_t op;
  313. if (pick_next_recovery(op))
  314. {
  315. recovery_ops[op.oid] = op;
  316. submit_recovery_op(&recovery_ops[op.oid]);
  317. }
  318. else
  319. return false;
  320. }
  321. return true;
  322. }