Simplified distributed block storage with strong consistency, like in Ceph
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

310 lines
10 KiB

  1. // Copyright (c) Vitaliy Filippov, 2019+
  2. // License: VNPL-1.0 (see README.md for details)
  3. #include "osd.h"
  4. #define FLUSH_BATCH 512
  5. void osd_t::submit_pg_flush_ops(pg_t & pg)
  6. {
  7. pg_flush_batch_t *fb = new pg_flush_batch_t();
  8. pg.flush_batch = fb;
  9. auto it = pg.flush_actions.begin(), prev_it = pg.flush_actions.begin();
  10. bool first = true;
  11. while (it != pg.flush_actions.end())
  12. {
  13. if (!first && (it->first.oid.inode != prev_it->first.oid.inode ||
  14. (it->first.oid.stripe & ~STRIPE_MASK) != (prev_it->first.oid.stripe & ~STRIPE_MASK)) &&
  15. fb->rollback_lists[it->first.osd_num].size() >= FLUSH_BATCH ||
  16. fb->stable_lists[it->first.osd_num].size() >= FLUSH_BATCH)
  17. {
  18. // Stop only at the object boundary
  19. break;
  20. }
  21. it->second.submitted = true;
  22. if (it->second.rollback)
  23. {
  24. fb->flush_objects++;
  25. fb->rollback_lists[it->first.osd_num].push_back((obj_ver_id){
  26. .oid = it->first.oid,
  27. .version = it->second.rollback_to,
  28. });
  29. }
  30. if (it->second.make_stable)
  31. {
  32. fb->flush_objects++;
  33. fb->stable_lists[it->first.osd_num].push_back((obj_ver_id){
  34. .oid = it->first.oid,
  35. .version = it->second.stable_to,
  36. });
  37. }
  38. prev_it = it;
  39. first = false;
  40. it++;
  41. }
  42. for (auto & l: fb->rollback_lists)
  43. {
  44. if (l.second.size() > 0)
  45. {
  46. fb->flush_ops++;
  47. submit_flush_op(pg.pool_id, pg.pg_num, fb, true, l.first, l.second.size(), l.second.data());
  48. }
  49. }
  50. for (auto & l: fb->stable_lists)
  51. {
  52. if (l.second.size() > 0)
  53. {
  54. fb->flush_ops++;
  55. submit_flush_op(pg.pool_id, pg.pg_num, fb, false, l.first, l.second.size(), l.second.data());
  56. }
  57. }
  58. }
  59. void osd_t::handle_flush_op(bool rollback, pool_id_t pool_id, pg_num_t pg_num, pg_flush_batch_t *fb, osd_num_t peer_osd, int retval)
  60. {
  61. pool_pg_num_t pg_id = { .pool_id = pool_id, .pg_num = pg_num };
  62. if (pgs.find(pg_id) == pgs.end() || pgs[pg_id].flush_batch != fb)
  63. {
  64. // Throw the result away
  65. return;
  66. }
  67. if (retval != 0)
  68. {
  69. if (peer_osd == this->osd_num)
  70. {
  71. throw std::runtime_error(
  72. std::string(rollback
  73. ? "Error while doing local rollback operation: "
  74. : "Error while doing local stabilize operation: "
  75. ) + strerror(-retval)
  76. );
  77. }
  78. else
  79. {
  80. printf("Error while doing flush on OSD %lu: %d (%s)\n", osd_num, retval, strerror(-retval));
  81. auto fd_it = c_cli.osd_peer_fds.find(peer_osd);
  82. if (fd_it != c_cli.osd_peer_fds.end())
  83. {
  84. c_cli.stop_client(fd_it->second);
  85. }
  86. return;
  87. }
  88. }
  89. fb->flush_done++;
  90. if (fb->flush_done == fb->flush_ops)
  91. {
  92. // This flush batch is done
  93. std::vector<osd_op_t*> continue_ops;
  94. auto & pg = pgs[pg_id];
  95. auto it = pg.flush_actions.begin(), prev_it = it;
  96. auto erase_start = it;
  97. while (1)
  98. {
  99. if (it == pg.flush_actions.end() ||
  100. it->first.oid.inode != prev_it->first.oid.inode ||
  101. (it->first.oid.stripe & ~STRIPE_MASK) != (prev_it->first.oid.stripe & ~STRIPE_MASK))
  102. {
  103. pg.ver_override.erase((object_id){
  104. .inode = prev_it->first.oid.inode,
  105. .stripe = (prev_it->first.oid.stripe & ~STRIPE_MASK),
  106. });
  107. auto wr_it = pg.write_queue.find((object_id){
  108. .inode = prev_it->first.oid.inode,
  109. .stripe = (prev_it->first.oid.stripe & ~STRIPE_MASK),
  110. });
  111. if (wr_it != pg.write_queue.end())
  112. {
  113. continue_ops.push_back(wr_it->second);
  114. pg.write_queue.erase(wr_it);
  115. }
  116. }
  117. if ((it == pg.flush_actions.end() || !it->second.submitted) &&
  118. erase_start != it)
  119. {
  120. pg.flush_actions.erase(erase_start, it);
  121. }
  122. if (it == pg.flush_actions.end())
  123. {
  124. break;
  125. }
  126. prev_it = it;
  127. if (!it->second.submitted)
  128. {
  129. it++;
  130. erase_start = it;
  131. }
  132. else
  133. {
  134. it++;
  135. }
  136. }
  137. delete fb;
  138. pg.flush_batch = NULL;
  139. if (!pg.flush_actions.size())
  140. {
  141. pg.state = pg.state & ~PG_HAS_UNCLEAN;
  142. report_pg_state(pg);
  143. }
  144. for (osd_op_t *op: continue_ops)
  145. {
  146. continue_primary_write(op);
  147. }
  148. if (pg.inflight == 0 && (pg.state & PG_STOPPING))
  149. {
  150. finish_stop_pg(pg);
  151. }
  152. }
  153. }
  154. void osd_t::submit_flush_op(pool_id_t pool_id, pg_num_t pg_num, pg_flush_batch_t *fb, bool rollback, osd_num_t peer_osd, int count, obj_ver_id *data)
  155. {
  156. osd_op_t *op = new osd_op_t();
  157. // Copy buffer so it gets freed along with the operation
  158. op->buf = malloc_or_die(sizeof(obj_ver_id) * count);
  159. memcpy(op->buf, data, sizeof(obj_ver_id) * count);
  160. if (peer_osd == this->osd_num)
  161. {
  162. // local
  163. clock_gettime(CLOCK_REALTIME, &op->tv_begin);
  164. op->bs_op = new blockstore_op_t((blockstore_op_t){
  165. .opcode = (uint64_t)(rollback ? BS_OP_ROLLBACK : BS_OP_STABLE),
  166. .callback = [this, op, pool_id, pg_num, fb](blockstore_op_t *bs_op)
  167. {
  168. add_bs_subop_stats(op);
  169. handle_flush_op(bs_op->opcode == BS_OP_ROLLBACK, pool_id, pg_num, fb, this->osd_num, bs_op->retval);
  170. delete op->bs_op;
  171. op->bs_op = NULL;
  172. delete op;
  173. },
  174. .len = (uint32_t)count,
  175. .buf = op->buf,
  176. });
  177. bs->enqueue_op(op->bs_op);
  178. }
  179. else
  180. {
  181. // Peer
  182. int peer_fd = c_cli.osd_peer_fds[peer_osd];
  183. op->op_type = OSD_OP_OUT;
  184. op->iov.push_back(op->buf, count * sizeof(obj_ver_id));
  185. op->peer_fd = peer_fd;
  186. op->req = (osd_any_op_t){
  187. .sec_stab = {
  188. .header = {
  189. .magic = SECONDARY_OSD_OP_MAGIC,
  190. .id = c_cli.next_subop_id++,
  191. .opcode = (uint64_t)(rollback ? OSD_OP_SEC_ROLLBACK : OSD_OP_SEC_STABILIZE),
  192. },
  193. .len = count * sizeof(obj_ver_id),
  194. },
  195. };
  196. op->callback = [this, pool_id, pg_num, fb, peer_osd](osd_op_t *op)
  197. {
  198. handle_flush_op(op->req.hdr.opcode == OSD_OP_SEC_ROLLBACK, pool_id, pg_num, fb, peer_osd, op->reply.hdr.retval);
  199. delete op;
  200. };
  201. c_cli.outbox_push(op);
  202. }
  203. }
  204. bool osd_t::pick_next_recovery(osd_recovery_op_t &op)
  205. {
  206. for (auto pg_it = pgs.begin(); pg_it != pgs.end(); pg_it++)
  207. {
  208. if ((pg_it->second.state & (PG_ACTIVE | PG_HAS_DEGRADED)) == (PG_ACTIVE | PG_HAS_DEGRADED))
  209. {
  210. for (auto obj_it = pg_it->second.degraded_objects.begin(); obj_it != pg_it->second.degraded_objects.end(); obj_it++)
  211. {
  212. if (recovery_ops.find(obj_it->first) == recovery_ops.end())
  213. {
  214. op.degraded = true;
  215. op.oid = obj_it->first;
  216. return true;
  217. }
  218. }
  219. }
  220. }
  221. for (auto pg_it = pgs.begin(); pg_it != pgs.end(); pg_it++)
  222. {
  223. if ((pg_it->second.state & (PG_ACTIVE | PG_HAS_MISPLACED)) == (PG_ACTIVE | PG_HAS_MISPLACED))
  224. {
  225. for (auto obj_it = pg_it->second.misplaced_objects.begin(); obj_it != pg_it->second.misplaced_objects.end(); obj_it++)
  226. {
  227. if (recovery_ops.find(obj_it->first) == recovery_ops.end())
  228. {
  229. op.degraded = false;
  230. op.oid = obj_it->first;
  231. return true;
  232. }
  233. }
  234. }
  235. }
  236. return false;
  237. }
  238. void osd_t::submit_recovery_op(osd_recovery_op_t *op)
  239. {
  240. op->osd_op = new osd_op_t();
  241. op->osd_op->op_type = OSD_OP_OUT;
  242. op->osd_op->req = (osd_any_op_t){
  243. .rw = {
  244. .header = {
  245. .magic = SECONDARY_OSD_OP_MAGIC,
  246. .id = 1,
  247. .opcode = OSD_OP_WRITE,
  248. },
  249. .inode = op->oid.inode,
  250. .offset = op->oid.stripe,
  251. .len = 0,
  252. },
  253. };
  254. if (log_level > 2)
  255. {
  256. printf("Submitting recovery operation for %lx:%lx\n", op->oid.inode, op->oid.stripe);
  257. }
  258. op->osd_op->callback = [this, op](osd_op_t *osd_op)
  259. {
  260. // Don't sync the write, it will be synced by our regular sync coroutine
  261. if (osd_op->reply.hdr.retval < 0)
  262. {
  263. // Error recovering object
  264. if (osd_op->reply.hdr.retval == -EPIPE)
  265. {
  266. // PG is stopped or one of the OSDs is gone, error is harmless
  267. printf(
  268. "Recovery operation failed with object %lx:%lx (PG %u/%u)\n",
  269. op->oid.inode, op->oid.stripe, INODE_POOL(op->oid.inode),
  270. map_to_pg(op->oid, st_cli.pool_config.at(INODE_POOL(op->oid.inode)).pg_stripe_size)
  271. );
  272. }
  273. else
  274. {
  275. throw std::runtime_error("Failed to recover an object");
  276. }
  277. }
  278. // CAREFUL! op = &recovery_ops[op->oid]. Don't access op->* after recovery_ops.erase()
  279. op->osd_op = NULL;
  280. recovery_ops.erase(op->oid);
  281. delete osd_op;
  282. continue_recovery();
  283. };
  284. exec_op(op->osd_op);
  285. }
  286. // Just trigger write requests for degraded objects. They'll be recovered during writing
  287. bool osd_t::continue_recovery()
  288. {
  289. while (recovery_ops.size() < recovery_queue_depth)
  290. {
  291. osd_recovery_op_t op;
  292. if (pick_next_recovery(op))
  293. {
  294. recovery_ops[op.oid] = op;
  295. submit_recovery_op(&recovery_ops[op.oid]);
  296. }
  297. else
  298. return false;
  299. }
  300. return true;
  301. }