Simplified distributed block storage with strong consistency, like in Ceph
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

266 lines
9.0 KiB

  1. // Copyright (c) Vitaliy Filippov, 2019+
  2. // License: VNPL-1.1 (see README.md for details)
  3. #include "osd_primary.h"
  4. // Save and clear unstable_writes -> SYNC all -> STABLE all
  5. void osd_t::continue_primary_sync(osd_op_t *cur_op)
  6. {
  7. if (!cur_op->op_data)
  8. {
  9. cur_op->op_data = (osd_primary_op_data_t*)calloc_or_die(1, sizeof(osd_primary_op_data_t));
  10. }
  11. osd_primary_op_data_t *op_data = cur_op->op_data;
  12. if (op_data->st == 1) goto resume_1;
  13. else if (op_data->st == 2) goto resume_2;
  14. else if (op_data->st == 3) goto resume_3;
  15. else if (op_data->st == 4) goto resume_4;
  16. else if (op_data->st == 5) goto resume_5;
  17. else if (op_data->st == 6) goto resume_6;
  18. else if (op_data->st == 7) goto resume_7;
  19. else if (op_data->st == 8) goto resume_8;
  20. assert(op_data->st == 0);
  21. if (syncs_in_progress.size() > 0)
  22. {
  23. // Wait for previous syncs, if any
  24. // FIXME: We may try to execute the current one in parallel, like in Blockstore, but I'm not sure if it matters at all
  25. syncs_in_progress.push_back(cur_op);
  26. op_data->st = 1;
  27. resume_1:
  28. return;
  29. }
  30. else
  31. {
  32. syncs_in_progress.push_back(cur_op);
  33. }
  34. resume_2:
  35. if (dirty_osds.size() == 0)
  36. {
  37. // Nothing to sync
  38. goto finish;
  39. }
  40. // Save and clear unstable_writes
  41. // In theory it is possible to do in on a per-client basis, but this seems to be an unnecessary complication
  42. // It would be cool not to copy these here at all, but someone has to deduplicate them by object IDs anyway
  43. if (unstable_writes.size() > 0)
  44. {
  45. op_data->unstable_write_osds = new std::vector<unstable_osd_num_t>();
  46. op_data->unstable_writes = new obj_ver_id[this->unstable_writes.size()];
  47. osd_num_t last_osd = 0;
  48. int last_start = 0, last_end = 0;
  49. for (auto it = this->unstable_writes.begin(); it != this->unstable_writes.end(); it++)
  50. {
  51. if (last_osd != it->first.osd_num)
  52. {
  53. if (last_osd != 0)
  54. {
  55. op_data->unstable_write_osds->push_back((unstable_osd_num_t){
  56. .osd_num = last_osd,
  57. .start = last_start,
  58. .len = last_end - last_start,
  59. });
  60. }
  61. last_osd = it->first.osd_num;
  62. last_start = last_end;
  63. }
  64. op_data->unstable_writes[last_end] = (obj_ver_id){
  65. .oid = it->first.oid,
  66. .version = it->second,
  67. };
  68. last_end++;
  69. }
  70. if (last_osd != 0)
  71. {
  72. op_data->unstable_write_osds->push_back((unstable_osd_num_t){
  73. .osd_num = last_osd,
  74. .start = last_start,
  75. .len = last_end - last_start,
  76. });
  77. }
  78. this->unstable_writes.clear();
  79. }
  80. {
  81. void *dirty_buf = malloc_or_die(
  82. sizeof(pool_pg_num_t)*dirty_pgs.size() +
  83. sizeof(osd_num_t)*dirty_osds.size() +
  84. sizeof(obj_ver_osd_t)*this->copies_to_delete_after_sync_count
  85. );
  86. op_data->dirty_pgs = (pool_pg_num_t*)dirty_buf;
  87. op_data->dirty_osds = (osd_num_t*)(dirty_buf + sizeof(pool_pg_num_t)*dirty_pgs.size());
  88. op_data->dirty_pg_count = dirty_pgs.size();
  89. op_data->dirty_osd_count = dirty_osds.size();
  90. if (this->copies_to_delete_after_sync_count)
  91. {
  92. op_data->copies_to_delete_count = 0;
  93. op_data->copies_to_delete = (obj_ver_osd_t*)(op_data->dirty_osds + op_data->dirty_osd_count);
  94. for (auto dirty_pg_num: dirty_pgs)
  95. {
  96. auto & pg = pgs.at(dirty_pg_num);
  97. assert(pg.copies_to_delete_after_sync.size() <= this->copies_to_delete_after_sync_count);
  98. memcpy(
  99. op_data->copies_to_delete + op_data->copies_to_delete_count,
  100. pg.copies_to_delete_after_sync.data(),
  101. sizeof(obj_ver_osd_t)*pg.copies_to_delete_after_sync.size()
  102. );
  103. op_data->copies_to_delete_count += pg.copies_to_delete_after_sync.size();
  104. this->copies_to_delete_after_sync_count -= pg.copies_to_delete_after_sync.size();
  105. pg.copies_to_delete_after_sync.clear();
  106. }
  107. assert(this->copies_to_delete_after_sync_count == 0);
  108. }
  109. int dpg = 0;
  110. for (auto dirty_pg_num: dirty_pgs)
  111. {
  112. pgs.at(dirty_pg_num).inflight++;
  113. op_data->dirty_pgs[dpg++] = dirty_pg_num;
  114. }
  115. dirty_pgs.clear();
  116. dpg = 0;
  117. for (auto osd_num: dirty_osds)
  118. {
  119. op_data->dirty_osds[dpg++] = osd_num;
  120. }
  121. dirty_osds.clear();
  122. }
  123. if (immediate_commit != IMMEDIATE_ALL)
  124. {
  125. // SYNC
  126. if (!submit_primary_sync_subops(cur_op))
  127. {
  128. goto resume_4;
  129. }
  130. resume_3:
  131. op_data->st = 3;
  132. return;
  133. resume_4:
  134. if (op_data->errors > 0)
  135. {
  136. goto resume_6;
  137. }
  138. }
  139. if (op_data->unstable_writes)
  140. {
  141. // Stabilize version sets, if any
  142. submit_primary_stab_subops(cur_op);
  143. resume_5:
  144. op_data->st = 5;
  145. return;
  146. }
  147. resume_6:
  148. if (op_data->errors > 0)
  149. {
  150. // Return PGs and OSDs back into their dirty sets
  151. for (int i = 0; i < op_data->dirty_pg_count; i++)
  152. {
  153. dirty_pgs.insert(op_data->dirty_pgs[i]);
  154. }
  155. for (int i = 0; i < op_data->dirty_osd_count; i++)
  156. {
  157. dirty_osds.insert(op_data->dirty_osds[i]);
  158. }
  159. if (op_data->unstable_writes)
  160. {
  161. // Return objects back into the unstable write set
  162. for (auto unstable_osd: *(op_data->unstable_write_osds))
  163. {
  164. for (int i = 0; i < unstable_osd.len; i++)
  165. {
  166. // Except those from peered PGs
  167. auto & w = op_data->unstable_writes[i];
  168. pool_pg_num_t wpg = {
  169. .pool_id = INODE_POOL(w.oid.inode),
  170. .pg_num = map_to_pg(w.oid, st_cli.pool_config.at(INODE_POOL(w.oid.inode)).pg_stripe_size),
  171. };
  172. if (pgs.at(wpg).state & PG_ACTIVE)
  173. {
  174. uint64_t & dest = this->unstable_writes[(osd_object_id_t){
  175. .osd_num = unstable_osd.osd_num,
  176. .oid = w.oid,
  177. }];
  178. dest = dest < w.version ? w.version : dest;
  179. dirty_pgs.insert(wpg);
  180. }
  181. }
  182. }
  183. }
  184. if (op_data->copies_to_delete)
  185. {
  186. // Return 'copies to delete' back into respective PGs
  187. for (int i = 0; i < op_data->copies_to_delete_count; i++)
  188. {
  189. auto & w = op_data->copies_to_delete[i];
  190. auto & pg = pgs.at((pool_pg_num_t){
  191. .pool_id = INODE_POOL(w.oid.inode),
  192. .pg_num = map_to_pg(w.oid, st_cli.pool_config.at(INODE_POOL(w.oid.inode)).pg_stripe_size),
  193. });
  194. if (pg.state & PG_ACTIVE)
  195. {
  196. pg.copies_to_delete_after_sync.push_back(w);
  197. copies_to_delete_after_sync_count++;
  198. }
  199. }
  200. }
  201. }
  202. else if (op_data->copies_to_delete)
  203. {
  204. // Actually delete copies which we wanted to delete
  205. submit_primary_del_batch(cur_op, op_data->copies_to_delete, op_data->copies_to_delete_count);
  206. resume_7:
  207. op_data->st = 7;
  208. return;
  209. resume_8:
  210. if (op_data->errors > 0)
  211. {
  212. goto resume_6;
  213. }
  214. }
  215. for (int i = 0; i < op_data->dirty_pg_count; i++)
  216. {
  217. auto & pg = pgs.at(op_data->dirty_pgs[i]);
  218. pg.inflight--;
  219. if ((pg.state & PG_STOPPING) && pg.inflight == 0 && !pg.flush_batch)
  220. {
  221. finish_stop_pg(pg);
  222. }
  223. else if ((pg.state & PG_REPEERING) && pg.inflight == 0 && !pg.flush_batch)
  224. {
  225. start_pg_peering(pg);
  226. }
  227. }
  228. // FIXME: Free those in the destructor?
  229. free(op_data->dirty_pgs);
  230. op_data->dirty_pgs = NULL;
  231. op_data->dirty_osds = NULL;
  232. if (op_data->unstable_writes)
  233. {
  234. delete op_data->unstable_write_osds;
  235. delete[] op_data->unstable_writes;
  236. op_data->unstable_writes = NULL;
  237. op_data->unstable_write_osds = NULL;
  238. }
  239. if (op_data->errors > 0)
  240. {
  241. finish_op(cur_op, op_data->epipe > 0 ? -EPIPE : -EIO);
  242. }
  243. else
  244. {
  245. finish:
  246. if (cur_op->peer_fd)
  247. {
  248. auto it = msgr.clients.find(cur_op->peer_fd);
  249. if (it != msgr.clients.end())
  250. it->second->dirty_pgs.clear();
  251. }
  252. finish_op(cur_op, 0);
  253. }
  254. assert(syncs_in_progress.front() == cur_op);
  255. syncs_in_progress.pop_front();
  256. if (syncs_in_progress.size() > 0)
  257. {
  258. cur_op = syncs_in_progress.front();
  259. op_data = cur_op->op_data;
  260. op_data->st++;
  261. goto resume_2;
  262. }
  263. }