Simplified distributed block storage with strong consistency, like in Ceph
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

382 lines
14 KiB

  1. // Copyright (c) Vitaliy Filippov, 2019+
  2. // License: VNPL-1.1 (see README.md for details)
  3. #include "osd_primary.h"
  4. #include "allocator.h"
  5. bool osd_t::check_write_queue(osd_op_t *cur_op, pg_t & pg)
  6. {
  7. osd_primary_op_data_t *op_data = cur_op->op_data;
  8. // Check if actions are pending for this object
  9. auto act_it = pg.flush_actions.lower_bound((obj_piece_id_t){
  10. .oid = op_data->oid,
  11. .osd_num = 0,
  12. });
  13. if (act_it != pg.flush_actions.end() &&
  14. act_it->first.oid.inode == op_data->oid.inode &&
  15. (act_it->first.oid.stripe & ~STRIPE_MASK) == op_data->oid.stripe)
  16. {
  17. pg.write_queue.emplace(op_data->oid, cur_op);
  18. return false;
  19. }
  20. // Check if there are other write requests to the same object
  21. auto vo_it = pg.write_queue.find(op_data->oid);
  22. if (vo_it != pg.write_queue.end())
  23. {
  24. op_data->st = 1;
  25. pg.write_queue.emplace(op_data->oid, cur_op);
  26. return false;
  27. }
  28. pg.write_queue.emplace(op_data->oid, cur_op);
  29. return true;
  30. }
  31. void osd_t::continue_primary_write(osd_op_t *cur_op)
  32. {
  33. if (!cur_op->op_data && !prepare_primary_rw(cur_op))
  34. {
  35. return;
  36. }
  37. osd_primary_op_data_t *op_data = cur_op->op_data;
  38. auto & pg = pgs.at({ .pool_id = INODE_POOL(op_data->oid.inode), .pg_num = op_data->pg_num });
  39. if (op_data->st == 1) goto resume_1;
  40. else if (op_data->st == 2) goto resume_2;
  41. else if (op_data->st == 3) goto resume_3;
  42. else if (op_data->st == 4) goto resume_4;
  43. else if (op_data->st == 5) goto resume_5;
  44. else if (op_data->st == 6) goto resume_6;
  45. else if (op_data->st == 7) goto resume_7;
  46. else if (op_data->st == 8) goto resume_8;
  47. else if (op_data->st == 9) goto resume_9;
  48. else if (op_data->st == 10) goto resume_10;
  49. assert(op_data->st == 0);
  50. if (!check_write_queue(cur_op, pg))
  51. {
  52. return;
  53. }
  54. resume_1:
  55. // Determine blocks to read and write
  56. // Missing chunks are allowed to be overwritten even in incomplete objects
  57. // FIXME: Allow to do small writes to the old (degraded/misplaced) OSD set for lower performance impact
  58. op_data->prev_set = get_object_osd_set(pg, op_data->oid, pg.cur_set.data(), &op_data->object_state);
  59. if (op_data->scheme == POOL_SCHEME_REPLICATED)
  60. {
  61. // Simplified algorithm
  62. op_data->stripes[0].write_start = op_data->stripes[0].req_start;
  63. op_data->stripes[0].write_end = op_data->stripes[0].req_end;
  64. op_data->stripes[0].write_buf = cur_op->buf;
  65. if (pg.cur_set.data() != op_data->prev_set && (op_data->stripes[0].write_start != 0 ||
  66. op_data->stripes[0].write_end != bs_block_size))
  67. {
  68. // Object is degraded/misplaced and will be moved to <write_osd_set>
  69. op_data->stripes[0].read_start = 0;
  70. op_data->stripes[0].read_end = bs_block_size;
  71. cur_op->rmw_buf = op_data->stripes[0].read_buf = memalign_or_die(MEM_ALIGNMENT, bs_block_size);
  72. }
  73. }
  74. else
  75. {
  76. cur_op->rmw_buf = calc_rmw(cur_op->buf, op_data->stripes, op_data->prev_set,
  77. pg.pg_size, op_data->pg_data_size, pg.pg_cursize, pg.cur_set.data(), bs_block_size, clean_entry_bitmap_size);
  78. if (!cur_op->rmw_buf)
  79. {
  80. // Refuse partial overwrite of an incomplete object
  81. cur_op->reply.hdr.retval = -EINVAL;
  82. goto continue_others;
  83. }
  84. }
  85. // Read required blocks
  86. submit_primary_subops(SUBMIT_RMW_READ, UINT64_MAX, op_data->prev_set, cur_op);
  87. resume_2:
  88. op_data->st = 2;
  89. return;
  90. resume_3:
  91. if (op_data->errors > 0)
  92. {
  93. pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO);
  94. return;
  95. }
  96. if (op_data->scheme == POOL_SCHEME_REPLICATED)
  97. {
  98. // Set bitmap bits
  99. bitmap_set(op_data->stripes[0].bmp_buf, op_data->stripes[0].write_start,
  100. op_data->stripes[0].write_end-op_data->stripes[0].write_start, bs_bitmap_granularity);
  101. // Possibly copy new data from the request into the recovery buffer
  102. if (pg.cur_set.data() != op_data->prev_set && (op_data->stripes[0].write_start != 0 ||
  103. op_data->stripes[0].write_end != bs_block_size))
  104. {
  105. memcpy(
  106. op_data->stripes[0].read_buf + op_data->stripes[0].req_start,
  107. op_data->stripes[0].write_buf,
  108. op_data->stripes[0].req_end - op_data->stripes[0].req_start
  109. );
  110. op_data->stripes[0].write_buf = op_data->stripes[0].read_buf;
  111. op_data->stripes[0].write_start = 0;
  112. op_data->stripes[0].write_end = bs_block_size;
  113. }
  114. }
  115. else
  116. {
  117. // For EC/XOR pools, save version override to make it impossible
  118. // for parallel reads to read different versions of data and parity
  119. pg.ver_override[op_data->oid] = op_data->fact_ver;
  120. // Recover missing stripes, calculate parity
  121. if (pg.scheme == POOL_SCHEME_XOR)
  122. {
  123. calc_rmw_parity_xor(op_data->stripes, pg.pg_size, op_data->prev_set, pg.cur_set.data(), bs_block_size, clean_entry_bitmap_size);
  124. }
  125. else if (pg.scheme == POOL_SCHEME_JERASURE)
  126. {
  127. calc_rmw_parity_jerasure(op_data->stripes, pg.pg_size, op_data->pg_data_size, op_data->prev_set, pg.cur_set.data(), bs_block_size, clean_entry_bitmap_size);
  128. }
  129. }
  130. // Send writes
  131. if ((op_data->fact_ver >> (64-PG_EPOCH_BITS)) < pg.epoch)
  132. {
  133. op_data->target_ver = ((uint64_t)pg.epoch << (64-PG_EPOCH_BITS)) | 1;
  134. }
  135. else
  136. {
  137. if ((op_data->fact_ver & (1ul<<(64-PG_EPOCH_BITS) - 1)) == (1ul<<(64-PG_EPOCH_BITS) - 1))
  138. {
  139. assert(pg.epoch != ((1ul << PG_EPOCH_BITS)-1));
  140. pg.epoch++;
  141. }
  142. op_data->target_ver = op_data->fact_ver + 1;
  143. }
  144. if (pg.epoch > pg.reported_epoch)
  145. {
  146. // Report newer epoch before writing
  147. // FIXME: We may report only one PG state here...
  148. this->pg_state_dirty.insert({ .pool_id = pg.pool_id, .pg_num = pg.pg_num });
  149. pg.history_changed = true;
  150. report_pg_states();
  151. resume_10:
  152. if (pg.epoch > pg.reported_epoch)
  153. {
  154. op_data->st = 10;
  155. return;
  156. }
  157. }
  158. submit_primary_subops(SUBMIT_WRITE, op_data->target_ver, pg.cur_set.data(), cur_op);
  159. resume_4:
  160. op_data->st = 4;
  161. return;
  162. resume_5:
  163. if (op_data->scheme != POOL_SCHEME_REPLICATED)
  164. {
  165. // Remove version override just after the write, but before stabilizing
  166. pg.ver_override.erase(op_data->oid);
  167. }
  168. if (op_data->errors > 0)
  169. {
  170. pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO);
  171. return;
  172. }
  173. if (op_data->object_state)
  174. {
  175. // We must forget the unclean state of the object before deleting it
  176. // so the next reads don't accidentally read a deleted version
  177. // And it should be done at the same time as the removal of the version override
  178. remove_object_from_state(op_data->oid, op_data->object_state, pg);
  179. pg.clean_count++;
  180. }
  181. resume_6:
  182. resume_7:
  183. if (!remember_unstable_write(cur_op, pg, pg.cur_loc_set, 6))
  184. {
  185. return;
  186. }
  187. if (op_data->fact_ver == 1)
  188. {
  189. // Object is created
  190. pg.clean_count++;
  191. pg.total_count++;
  192. }
  193. if (op_data->object_state)
  194. {
  195. {
  196. int recovery_type = op_data->object_state->state & (OBJ_DEGRADED|OBJ_INCOMPLETE) ? 0 : 1;
  197. recovery_stat_count[0][recovery_type]++;
  198. if (!recovery_stat_count[0][recovery_type])
  199. {
  200. recovery_stat_count[0][recovery_type]++;
  201. recovery_stat_bytes[0][recovery_type] = 0;
  202. }
  203. for (int role = 0; role < (op_data->scheme == POOL_SCHEME_REPLICATED ? 1 : pg.pg_size); role++)
  204. {
  205. recovery_stat_bytes[0][recovery_type] += op_data->stripes[role].write_end - op_data->stripes[role].write_start;
  206. }
  207. }
  208. // Any kind of a non-clean object can have extra chunks, because we don't record objects
  209. // as degraded & misplaced or incomplete & misplaced at the same time. So try to remove extra chunks
  210. if (immediate_commit != IMMEDIATE_ALL)
  211. {
  212. // We can't remove extra chunks yet if fsyncs are explicit, because
  213. // new copies may not be committed to stable storage yet
  214. // We can only remove extra chunks after a successful SYNC for this PG
  215. for (auto & chunk: op_data->object_state->osd_set)
  216. {
  217. // Check is the same as in submit_primary_del_subops()
  218. if (op_data->scheme == POOL_SCHEME_REPLICATED
  219. ? !contains_osd(pg.cur_set.data(), pg.pg_size, chunk.osd_num)
  220. : (chunk.osd_num != pg.cur_set[chunk.role]))
  221. {
  222. pg.copies_to_delete_after_sync.push_back((obj_ver_osd_t){
  223. .osd_num = chunk.osd_num,
  224. .oid = {
  225. .inode = op_data->oid.inode,
  226. .stripe = op_data->oid.stripe | (op_data->scheme == POOL_SCHEME_REPLICATED ? 0 : chunk.role),
  227. },
  228. .version = op_data->fact_ver,
  229. });
  230. copies_to_delete_after_sync_count++;
  231. }
  232. }
  233. free_object_state(pg, &op_data->object_state);
  234. }
  235. else
  236. {
  237. submit_primary_del_subops(cur_op, pg.cur_set.data(), pg.pg_size, op_data->object_state->osd_set);
  238. free_object_state(pg, &op_data->object_state);
  239. if (op_data->n_subops > 0)
  240. {
  241. resume_8:
  242. op_data->st = 8;
  243. return;
  244. resume_9:
  245. if (op_data->errors > 0)
  246. {
  247. pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO);
  248. return;
  249. }
  250. }
  251. }
  252. }
  253. cur_op->reply.hdr.retval = cur_op->req.rw.len;
  254. continue_others:
  255. osd_op_t *next_op = NULL;
  256. auto next_it = pg.write_queue.find(op_data->oid);
  257. // Remove the operation from queue before calling finish_op so it doesn't see the completed operation in queue
  258. if (next_it != pg.write_queue.end() && next_it->second == cur_op)
  259. {
  260. pg.write_queue.erase(next_it++);
  261. if (next_it != pg.write_queue.end() && next_it->first == op_data->oid)
  262. next_op = next_it->second;
  263. }
  264. // finish_op would invalidate next_it if it cleared pg.write_queue, but it doesn't do that :)
  265. finish_op(cur_op, cur_op->req.rw.len);
  266. if (next_op)
  267. {
  268. // Continue next write to the same object
  269. continue_primary_write(next_op);
  270. }
  271. }
  272. bool osd_t::remember_unstable_write(osd_op_t *cur_op, pg_t & pg, pg_osd_set_t & loc_set, int base_state)
  273. {
  274. osd_primary_op_data_t *op_data = cur_op->op_data;
  275. if (op_data->st == base_state)
  276. {
  277. goto resume_6;
  278. }
  279. else if (op_data->st == base_state+1)
  280. {
  281. goto resume_7;
  282. }
  283. if (immediate_commit == IMMEDIATE_ALL)
  284. {
  285. immediate:
  286. if (op_data->scheme != POOL_SCHEME_REPLICATED)
  287. {
  288. // Send STABILIZE ops immediately
  289. op_data->unstable_write_osds = new std::vector<unstable_osd_num_t>();
  290. op_data->unstable_writes = new obj_ver_id[loc_set.size()];
  291. {
  292. int last_start = 0;
  293. for (auto & chunk: loc_set)
  294. {
  295. op_data->unstable_writes[last_start] = (obj_ver_id){
  296. .oid = {
  297. .inode = op_data->oid.inode,
  298. .stripe = op_data->oid.stripe | chunk.role,
  299. },
  300. .version = op_data->fact_ver,
  301. };
  302. op_data->unstable_write_osds->push_back((unstable_osd_num_t){
  303. .osd_num = chunk.osd_num,
  304. .start = last_start,
  305. .len = 1,
  306. });
  307. last_start++;
  308. }
  309. }
  310. submit_primary_stab_subops(cur_op);
  311. resume_6:
  312. op_data->st = 6;
  313. return false;
  314. resume_7:
  315. // FIXME: Free those in the destructor?
  316. delete op_data->unstable_write_osds;
  317. delete[] op_data->unstable_writes;
  318. op_data->unstable_writes = NULL;
  319. op_data->unstable_write_osds = NULL;
  320. if (op_data->errors > 0)
  321. {
  322. pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO);
  323. return false;
  324. }
  325. }
  326. }
  327. else if (immediate_commit == IMMEDIATE_SMALL)
  328. {
  329. int stripe_count = (op_data->scheme == POOL_SCHEME_REPLICATED ? 1 : op_data->pg_size);
  330. for (int role = 0; role < stripe_count; role++)
  331. {
  332. if (op_data->stripes[role].write_start == 0 &&
  333. op_data->stripes[role].write_end == bs_block_size)
  334. {
  335. // Big write. Treat write as unsynced
  336. goto lazy;
  337. }
  338. }
  339. goto immediate;
  340. }
  341. else
  342. {
  343. lazy:
  344. if (op_data->scheme != POOL_SCHEME_REPLICATED)
  345. {
  346. // Remember version as unstable for EC/XOR
  347. for (auto & chunk: loc_set)
  348. {
  349. this->dirty_osds.insert(chunk.osd_num);
  350. this->unstable_writes[(osd_object_id_t){
  351. .osd_num = chunk.osd_num,
  352. .oid = {
  353. .inode = op_data->oid.inode,
  354. .stripe = op_data->oid.stripe | chunk.role,
  355. },
  356. }] = op_data->fact_ver;
  357. }
  358. }
  359. else
  360. {
  361. // Only remember to sync OSDs for replicated pools
  362. for (auto & chunk: loc_set)
  363. {
  364. this->dirty_osds.insert(chunk.osd_num);
  365. }
  366. }
  367. // Remember PG as dirty to drop the connection when PG goes offline
  368. // (this is required because of the "lazy sync")
  369. auto cl_it = msgr.clients.find(cur_op->peer_fd);
  370. if (cl_it != msgr.clients.end())
  371. {
  372. cl_it->second->dirty_pgs.insert({ .pool_id = pg.pool_id, .pg_num = pg.pg_num });
  373. }
  374. dirty_pgs.insert({ .pool_id = pg.pool_id, .pg_num = pg.pg_num });
  375. }
  376. return true;
  377. }