Simplified distributed block storage with strong consistency, like in Ceph
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

1149 lines
35 KiB

  1. // Copyright (c) Vitaliy Filippov, 2019+
  2. // License: VNPL-1.1 or GNU GPL-2.0+ (see README.md for details)
  3. #include <stdexcept>
  4. #include <assert.h>
  5. #include "cluster_client.h"
  6. #define SCRAP_BUFFER_SIZE 4*1024*1024
  7. #define PART_SENT 1
  8. #define PART_DONE 2
  9. #define PART_ERROR 4
  10. #define CACHE_DIRTY 1
  11. #define CACHE_FLUSHING 2
  12. #define CACHE_REPEATING 3
  13. #define OP_FLUSH_BUFFER 2
  14. cluster_client_t::cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd, json11::Json & config)
  15. {
  16. config = osd_messenger_t::read_config(config);
  17. this->ringloop = ringloop;
  18. this->tfd = tfd;
  19. this->config = config;
  20. msgr.osd_num = 0;
  21. msgr.tfd = tfd;
  22. msgr.ringloop = ringloop;
  23. msgr.repeer_pgs = [this](osd_num_t peer_osd)
  24. {
  25. if (msgr.osd_peer_fds.find(peer_osd) != msgr.osd_peer_fds.end())
  26. {
  27. // peer_osd just connected
  28. continue_ops();
  29. }
  30. else if (dirty_buffers.size())
  31. {
  32. // peer_osd just dropped connection
  33. // determine WHICH dirty_buffers are now obsolete and repeat them
  34. for (auto & wr: dirty_buffers)
  35. {
  36. if (affects_osd(wr.first.inode, wr.first.stripe, wr.second.len, peer_osd) &&
  37. wr.second.state != CACHE_REPEATING)
  38. {
  39. // FIXME: Flush in larger parts
  40. flush_buffer(wr.first, &wr.second);
  41. }
  42. }
  43. continue_ops();
  44. }
  45. };
  46. msgr.exec_op = [this](osd_op_t *op)
  47. {
  48. // Garbage in
  49. fprintf(stderr, "Incoming garbage from peer %d\n", op->peer_fd);
  50. msgr.stop_client(op->peer_fd);
  51. delete op;
  52. };
  53. msgr.parse_config(this->config);
  54. msgr.init();
  55. st_cli.tfd = tfd;
  56. st_cli.on_load_config_hook = [this](json11::Json::object & cfg) { on_load_config_hook(cfg); };
  57. st_cli.on_change_osd_state_hook = [this](uint64_t peer_osd) { on_change_osd_state_hook(peer_osd); };
  58. st_cli.on_change_hook = [this](std::map<std::string, etcd_kv_t> & changes) { on_change_hook(changes); };
  59. st_cli.on_load_pgs_hook = [this](bool success) { on_load_pgs_hook(success); };
  60. st_cli.parse_config(config);
  61. st_cli.load_global_config();
  62. scrap_buffer_size = SCRAP_BUFFER_SIZE;
  63. scrap_buffer = malloc_or_die(scrap_buffer_size);
  64. if (ringloop)
  65. {
  66. consumer.loop = [this]()
  67. {
  68. msgr.read_requests();
  69. msgr.send_replies();
  70. this->ringloop->submit();
  71. };
  72. ringloop->register_consumer(&consumer);
  73. }
  74. }
  75. cluster_client_t::~cluster_client_t()
  76. {
  77. for (auto bp: dirty_buffers)
  78. {
  79. free(bp.second.buf);
  80. }
  81. dirty_buffers.clear();
  82. if (ringloop)
  83. {
  84. ringloop->unregister_consumer(&consumer);
  85. }
  86. free(scrap_buffer);
  87. }
  88. cluster_op_t::~cluster_op_t()
  89. {
  90. if (buf)
  91. {
  92. free(buf);
  93. buf = NULL;
  94. }
  95. if (bitmap_buf)
  96. {
  97. free(bitmap_buf);
  98. part_bitmaps = NULL;
  99. bitmap_buf = NULL;
  100. }
  101. }
  102. void cluster_client_t::calc_wait(cluster_op_t *op)
  103. {
  104. op->prev_wait = 0;
  105. if (op->opcode == OSD_OP_WRITE)
  106. {
  107. for (auto prev = op->prev; prev; prev = prev->prev)
  108. {
  109. if (prev->opcode == OSD_OP_SYNC ||
  110. prev->opcode == OSD_OP_WRITE && !(op->flags & OP_FLUSH_BUFFER) && (prev->flags & OP_FLUSH_BUFFER))
  111. {
  112. op->prev_wait++;
  113. }
  114. }
  115. if (!op->prev_wait && pgs_loaded)
  116. continue_rw(op);
  117. }
  118. else if (op->opcode == OSD_OP_SYNC)
  119. {
  120. for (auto prev = op->prev; prev; prev = prev->prev)
  121. {
  122. if (prev->opcode == OSD_OP_SYNC || prev->opcode == OSD_OP_WRITE)
  123. {
  124. op->prev_wait++;
  125. }
  126. }
  127. if (!op->prev_wait && pgs_loaded)
  128. continue_sync(op);
  129. }
  130. else
  131. {
  132. for (auto prev = op->prev; prev; prev = prev->prev)
  133. {
  134. if (prev->opcode == OSD_OP_WRITE && prev->flags & OP_FLUSH_BUFFER)
  135. {
  136. op->prev_wait++;
  137. }
  138. else if (prev->opcode == OSD_OP_WRITE || prev->opcode == OSD_OP_READ)
  139. {
  140. // Flushes are always in the beginning
  141. break;
  142. }
  143. }
  144. if (!op->prev_wait && pgs_loaded)
  145. continue_rw(op);
  146. }
  147. }
  148. void cluster_client_t::inc_wait(uint64_t opcode, uint64_t flags, cluster_op_t *next, int inc)
  149. {
  150. if (opcode == OSD_OP_WRITE)
  151. {
  152. while (next)
  153. {
  154. auto n2 = next->next;
  155. if (next->opcode == OSD_OP_SYNC ||
  156. next->opcode == OSD_OP_WRITE && (flags & OP_FLUSH_BUFFER) && !(next->flags & OP_FLUSH_BUFFER) ||
  157. next->opcode == OSD_OP_READ && (flags & OP_FLUSH_BUFFER))
  158. {
  159. next->prev_wait += inc;
  160. if (!next->prev_wait)
  161. {
  162. if (next->opcode == OSD_OP_SYNC)
  163. continue_sync(next);
  164. else
  165. continue_rw(next);
  166. }
  167. }
  168. next = n2;
  169. }
  170. }
  171. else if (opcode == OSD_OP_SYNC)
  172. {
  173. while (next)
  174. {
  175. auto n2 = next->next;
  176. if (next->opcode == OSD_OP_SYNC || next->opcode == OSD_OP_WRITE)
  177. {
  178. next->prev_wait += inc;
  179. if (!next->prev_wait)
  180. {
  181. if (next->opcode == OSD_OP_SYNC)
  182. continue_sync(next);
  183. else
  184. continue_rw(next);
  185. }
  186. }
  187. next = n2;
  188. }
  189. }
  190. }
  191. void cluster_client_t::erase_op(cluster_op_t *op)
  192. {
  193. uint64_t opcode = op->opcode, flags = op->flags;
  194. cluster_op_t *next = op->next;
  195. if (op->prev)
  196. op->prev->next = op->next;
  197. if (op->next)
  198. op->next->prev = op->prev;
  199. if (op_queue_head == op)
  200. op_queue_head = op->next;
  201. if (op_queue_tail == op)
  202. op_queue_tail = op->prev;
  203. op->next = op->prev = NULL;
  204. std::function<void(cluster_op_t*)>(op->callback)(op);
  205. if (!immediate_commit)
  206. inc_wait(opcode, flags, next, -1);
  207. }
  208. void cluster_client_t::continue_ops(bool up_retry)
  209. {
  210. if (!pgs_loaded)
  211. {
  212. // We're offline
  213. return;
  214. }
  215. if (continuing_ops)
  216. {
  217. // Attempt to reenter the function
  218. return;
  219. }
  220. restart:
  221. continuing_ops = 1;
  222. for (auto op = op_queue_head; op; )
  223. {
  224. cluster_op_t *next_op = op->next;
  225. if (!op->up_wait || up_retry)
  226. {
  227. op->up_wait = false;
  228. if (!op->prev_wait)
  229. {
  230. if (op->opcode == OSD_OP_SYNC)
  231. continue_sync(op);
  232. else
  233. continue_rw(op);
  234. }
  235. }
  236. op = next_op;
  237. if (continuing_ops == 2)
  238. {
  239. goto restart;
  240. }
  241. }
  242. continuing_ops = 0;
  243. }
  244. static uint32_t is_power_of_two(uint64_t value)
  245. {
  246. uint32_t l = 0;
  247. while (value > 1)
  248. {
  249. if (value & 1)
  250. {
  251. return 64;
  252. }
  253. value = value >> 1;
  254. l++;
  255. }
  256. return l;
  257. }
  258. void cluster_client_t::on_load_config_hook(json11::Json::object & config)
  259. {
  260. bs_block_size = config["block_size"].uint64_value();
  261. bs_bitmap_granularity = config["bitmap_granularity"].uint64_value();
  262. if (!bs_block_size)
  263. {
  264. bs_block_size = DEFAULT_BLOCK_SIZE;
  265. }
  266. if (!bs_bitmap_granularity)
  267. {
  268. bs_bitmap_granularity = DEFAULT_BITMAP_GRANULARITY;
  269. }
  270. bs_bitmap_size = bs_block_size / bs_bitmap_granularity / 8;
  271. uint32_t block_order;
  272. if ((block_order = is_power_of_two(bs_block_size)) >= 64 || bs_block_size < MIN_BLOCK_SIZE || bs_block_size >= MAX_BLOCK_SIZE)
  273. {
  274. throw std::runtime_error("Bad block size");
  275. }
  276. // Cluster-wide immediate_commit mode
  277. immediate_commit = (config["immediate_commit"] == "all");
  278. if (config.find("client_max_dirty_bytes") != config.end())
  279. {
  280. client_max_dirty_bytes = config["client_max_dirty_bytes"].uint64_value();
  281. }
  282. else if (config.find("client_dirty_limit") != config.end())
  283. {
  284. // Old name
  285. client_max_dirty_bytes = config["client_dirty_limit"].uint64_value();
  286. }
  287. if (config.find("client_max_dirty_ops") != config.end())
  288. {
  289. client_max_dirty_ops = config["client_max_dirty_ops"].uint64_value();
  290. }
  291. if (!client_max_dirty_bytes)
  292. {
  293. client_max_dirty_bytes = DEFAULT_CLIENT_MAX_DIRTY_BYTES;
  294. }
  295. if (!client_max_dirty_ops)
  296. {
  297. client_max_dirty_ops = DEFAULT_CLIENT_MAX_DIRTY_OPS;
  298. }
  299. up_wait_retry_interval = config["up_wait_retry_interval"].uint64_value();
  300. if (!up_wait_retry_interval)
  301. {
  302. up_wait_retry_interval = 500;
  303. }
  304. else if (up_wait_retry_interval < 50)
  305. {
  306. up_wait_retry_interval = 50;
  307. }
  308. msgr.parse_config(config);
  309. msgr.parse_config(this->config);
  310. st_cli.load_pgs();
  311. }
  312. void cluster_client_t::on_load_pgs_hook(bool success)
  313. {
  314. for (auto pool_item: st_cli.pool_config)
  315. {
  316. pg_counts[pool_item.first] = pool_item.second.real_pg_count;
  317. }
  318. pgs_loaded = true;
  319. for (auto fn: on_ready_hooks)
  320. {
  321. fn();
  322. }
  323. on_ready_hooks.clear();
  324. for (auto op: offline_ops)
  325. {
  326. execute(op);
  327. }
  328. offline_ops.clear();
  329. continue_ops();
  330. }
  331. void cluster_client_t::on_change_hook(std::map<std::string, etcd_kv_t> & changes)
  332. {
  333. for (auto pool_item: st_cli.pool_config)
  334. {
  335. if (pg_counts[pool_item.first] != pool_item.second.real_pg_count)
  336. {
  337. // At this point, all pool operations should have been suspended
  338. // And now they have to be resliced!
  339. for (auto op = op_queue_head; op; op = op->next)
  340. {
  341. if ((op->opcode == OSD_OP_WRITE || op->opcode == OSD_OP_READ) &&
  342. INODE_POOL(op->cur_inode) == pool_item.first)
  343. {
  344. op->needs_reslice = true;
  345. }
  346. }
  347. pg_counts[pool_item.first] = pool_item.second.real_pg_count;
  348. }
  349. }
  350. continue_ops();
  351. }
  352. void cluster_client_t::on_change_osd_state_hook(uint64_t peer_osd)
  353. {
  354. if (msgr.wanted_peers.find(peer_osd) != msgr.wanted_peers.end())
  355. {
  356. msgr.connect_peer(peer_osd, st_cli.peer_states[peer_osd]);
  357. }
  358. }
  359. bool cluster_client_t::is_ready()
  360. {
  361. return pgs_loaded;
  362. }
  363. void cluster_client_t::on_ready(std::function<void(void)> fn)
  364. {
  365. if (pgs_loaded)
  366. {
  367. fn();
  368. }
  369. else
  370. {
  371. on_ready_hooks.push_back(fn);
  372. }
  373. }
  374. /**
  375. * How writes are synced when immediate_commit is false
  376. *
  377. * "Continue" WRITE:
  378. * 1) if the operation is not sliced yet - slice it
  379. * 2) if the operation doesn't require reslice - try to connect & send all remaining parts
  380. * 3) if any of them fail due to disconnected peers or PGs not up, repeat after reconnecting or small timeout
  381. * 4) if any of them fail due to other errors, fail the operation and forget it from the current "unsynced batch"
  382. * 5) if PG count changes before all parts are done, wait for all in-progress parts to finish,
  383. * throw all results away, reslice and resubmit op
  384. * 6) when all parts are done, try to "continue" the current SYNC
  385. * 7) if the operation succeeds, but then some OSDs drop their connections, repeat
  386. * parts from the current "unsynced batch" previously sent to those OSDs in any order
  387. *
  388. * "Continue" current SYNC:
  389. * 1) take all unsynced operations from the current batch
  390. * 2) check if all affected OSDs are still alive
  391. * 3) if yes, send all SYNCs. otherwise, leave current SYNC as is.
  392. * 4) if any of them fail due to disconnected peers, repeat SYNC after repeating all writes
  393. * 5) if any of them fail due to other errors, fail the SYNC operation
  394. */
  395. void cluster_client_t::execute(cluster_op_t *op)
  396. {
  397. if (op->opcode != OSD_OP_SYNC && op->opcode != OSD_OP_READ && op->opcode != OSD_OP_WRITE)
  398. {
  399. op->retval = -EINVAL;
  400. std::function<void(cluster_op_t*)>(op->callback)(op);
  401. return;
  402. }
  403. op->cur_inode = op->inode;
  404. op->retval = 0;
  405. if (op->opcode == OSD_OP_WRITE && !immediate_commit)
  406. {
  407. if (dirty_bytes >= client_max_dirty_bytes || dirty_ops >= client_max_dirty_ops)
  408. {
  409. // Push an extra SYNC operation to flush previous writes
  410. cluster_op_t *sync_op = new cluster_op_t;
  411. sync_op->opcode = OSD_OP_SYNC;
  412. sync_op->callback = [](cluster_op_t* sync_op)
  413. {
  414. delete sync_op;
  415. };
  416. sync_op->prev = op_queue_tail;
  417. if (op_queue_tail)
  418. {
  419. op_queue_tail->next = sync_op;
  420. op_queue_tail = sync_op;
  421. }
  422. else
  423. op_queue_tail = op_queue_head = sync_op;
  424. dirty_bytes = 0;
  425. dirty_ops = 0;
  426. calc_wait(sync_op);
  427. }
  428. dirty_bytes += op->len;
  429. dirty_ops++;
  430. }
  431. else if (op->opcode == OSD_OP_SYNC)
  432. {
  433. dirty_bytes = 0;
  434. dirty_ops = 0;
  435. }
  436. op->prev = op_queue_tail;
  437. if (op_queue_tail)
  438. {
  439. op_queue_tail->next = op;
  440. op_queue_tail = op;
  441. }
  442. else
  443. op_queue_tail = op_queue_head = op;
  444. if (!immediate_commit)
  445. calc_wait(op);
  446. else if (pgs_loaded)
  447. {
  448. if (op->opcode == OSD_OP_SYNC)
  449. continue_sync(op);
  450. else
  451. continue_rw(op);
  452. }
  453. }
  454. void cluster_client_t::copy_write(cluster_op_t *op, std::map<object_id, cluster_buffer_t> & dirty_buffers)
  455. {
  456. // Save operation for replay when one of PGs goes out of sync
  457. // (primary OSD drops our connection in this case)
  458. auto dirty_it = dirty_buffers.lower_bound((object_id){
  459. .inode = op->inode,
  460. .stripe = op->offset,
  461. });
  462. while (dirty_it != dirty_buffers.begin())
  463. {
  464. dirty_it--;
  465. if (dirty_it->first.inode != op->inode ||
  466. (dirty_it->first.stripe + dirty_it->second.len) <= op->offset)
  467. {
  468. dirty_it++;
  469. break;
  470. }
  471. }
  472. uint64_t pos = op->offset, len = op->len, iov_idx = 0, iov_pos = 0;
  473. while (len > 0)
  474. {
  475. uint64_t new_len = 0;
  476. if (dirty_it == dirty_buffers.end())
  477. {
  478. new_len = len;
  479. }
  480. else if (dirty_it->first.inode != op->inode || dirty_it->first.stripe > pos)
  481. {
  482. new_len = dirty_it->first.stripe - pos;
  483. if (new_len > len)
  484. {
  485. new_len = len;
  486. }
  487. }
  488. if (new_len > 0)
  489. {
  490. dirty_it = dirty_buffers.emplace_hint(dirty_it, (object_id){
  491. .inode = op->inode,
  492. .stripe = pos,
  493. }, (cluster_buffer_t){
  494. .buf = malloc_or_die(new_len),
  495. .len = new_len,
  496. });
  497. }
  498. // FIXME: Split big buffers into smaller ones on overwrites. But this will require refcounting
  499. dirty_it->second.state = CACHE_DIRTY;
  500. uint64_t cur_len = (dirty_it->first.stripe + dirty_it->second.len - pos);
  501. if (cur_len > len)
  502. {
  503. cur_len = len;
  504. }
  505. while (cur_len > 0 && iov_idx < op->iov.count)
  506. {
  507. unsigned iov_len = (op->iov.buf[iov_idx].iov_len - iov_pos);
  508. if (iov_len <= cur_len)
  509. {
  510. memcpy(dirty_it->second.buf + pos - dirty_it->first.stripe,
  511. op->iov.buf[iov_idx].iov_base + iov_pos, iov_len);
  512. pos += iov_len;
  513. len -= iov_len;
  514. cur_len -= iov_len;
  515. iov_pos = 0;
  516. iov_idx++;
  517. }
  518. else
  519. {
  520. memcpy(dirty_it->second.buf + pos - dirty_it->first.stripe,
  521. op->iov.buf[iov_idx].iov_base + iov_pos, cur_len);
  522. pos += cur_len;
  523. len -= cur_len;
  524. iov_pos += cur_len;
  525. cur_len = 0;
  526. }
  527. }
  528. dirty_it++;
  529. }
  530. }
  531. void cluster_client_t::flush_buffer(const object_id & oid, cluster_buffer_t *wr)
  532. {
  533. wr->state = CACHE_REPEATING;
  534. cluster_op_t *op = new cluster_op_t;
  535. op->flags = OP_FLUSH_BUFFER;
  536. op->opcode = OSD_OP_WRITE;
  537. op->cur_inode = op->inode = oid.inode;
  538. op->offset = oid.stripe;
  539. op->len = wr->len;
  540. op->iov.push_back(wr->buf, wr->len);
  541. op->callback = [wr](cluster_op_t* op)
  542. {
  543. if (wr->state == CACHE_REPEATING)
  544. {
  545. wr->state = CACHE_DIRTY;
  546. }
  547. delete op;
  548. };
  549. op->next = op_queue_head;
  550. if (op_queue_head)
  551. {
  552. op_queue_head->prev = op;
  553. op_queue_head = op;
  554. }
  555. else
  556. op_queue_tail = op_queue_head = op;
  557. inc_wait(op->opcode, op->flags, op->next, 1);
  558. continue_rw(op);
  559. }
  560. int cluster_client_t::continue_rw(cluster_op_t *op)
  561. {
  562. if (op->state == 0)
  563. goto resume_0;
  564. else if (op->state == 1)
  565. goto resume_1;
  566. else if (op->state == 2)
  567. goto resume_2;
  568. else if (op->state == 3)
  569. goto resume_3;
  570. resume_0:
  571. if (!op->len || op->offset % bs_bitmap_granularity || op->len % bs_bitmap_granularity)
  572. {
  573. op->retval = -EINVAL;
  574. erase_op(op);
  575. return 1;
  576. }
  577. {
  578. pool_id_t pool_id = INODE_POOL(op->cur_inode);
  579. if (!pool_id)
  580. {
  581. op->retval = -EINVAL;
  582. erase_op(op);
  583. return 1;
  584. }
  585. if (st_cli.pool_config.find(pool_id) == st_cli.pool_config.end() ||
  586. st_cli.pool_config[pool_id].real_pg_count == 0)
  587. {
  588. // Postpone operations to unknown pools
  589. return 0;
  590. }
  591. }
  592. if (op->opcode == OSD_OP_WRITE)
  593. {
  594. auto ino_it = st_cli.inode_config.find(op->inode);
  595. if (ino_it != st_cli.inode_config.end() && ino_it->second.readonly)
  596. {
  597. op->retval = -EINVAL;
  598. erase_op(op);
  599. return 1;
  600. }
  601. if (!immediate_commit && !(op->flags & OP_FLUSH_BUFFER))
  602. {
  603. copy_write(op, dirty_buffers);
  604. }
  605. }
  606. resume_1:
  607. // Slice the operation into parts
  608. slice_rw(op);
  609. op->needs_reslice = false;
  610. resume_2:
  611. // Send unsent parts, if they're not subject to change
  612. op->state = 3;
  613. if (op->needs_reslice)
  614. {
  615. for (int i = 0; i < op->parts.size(); i++)
  616. {
  617. if (!(op->parts[i].flags & PART_SENT) && op->retval)
  618. {
  619. op->retval = -EPIPE;
  620. }
  621. }
  622. goto resume_3;
  623. }
  624. for (int i = 0; i < op->parts.size(); i++)
  625. {
  626. if (!(op->parts[i].flags & PART_SENT))
  627. {
  628. if (!try_send(op, i))
  629. {
  630. // We'll need to retry again
  631. op->up_wait = true;
  632. if (!retry_timeout_id)
  633. {
  634. retry_timeout_id = tfd->set_timer(up_wait_retry_interval, false, [this](int)
  635. {
  636. retry_timeout_id = 0;
  637. continue_ops(true);
  638. });
  639. }
  640. op->state = 2;
  641. }
  642. }
  643. }
  644. if (op->state == 2)
  645. {
  646. return 0;
  647. }
  648. resume_3:
  649. if (op->inflight_count > 0)
  650. {
  651. op->state = 3;
  652. return 0;
  653. }
  654. if (op->done_count >= op->parts.size())
  655. {
  656. // Finished successfully
  657. // Even if the PG count has changed in meanwhile we treat it as success
  658. // because if some operations were invalid for the new PG count we'd get errors
  659. bool is_read = op->opcode == OSD_OP_READ;
  660. if (is_read)
  661. {
  662. // Check parent inode
  663. auto ino_it = st_cli.inode_config.find(op->cur_inode);
  664. while (ino_it != st_cli.inode_config.end() && ino_it->second.parent_id &&
  665. INODE_POOL(ino_it->second.parent_id) == INODE_POOL(op->cur_inode))
  666. {
  667. // Skip parents from the same pool
  668. ino_it = st_cli.inode_config.find(ino_it->second.parent_id);
  669. }
  670. if (ino_it != st_cli.inode_config.end() &&
  671. ino_it->second.parent_id)
  672. {
  673. // Continue reading from the parent inode
  674. op->cur_inode = ino_it->second.parent_id;
  675. op->parts.clear();
  676. op->done_count = 0;
  677. goto resume_1;
  678. }
  679. }
  680. op->retval = op->len;
  681. erase_op(op);
  682. return 1;
  683. }
  684. else if (op->retval != 0 && op->retval != -EPIPE)
  685. {
  686. // Fatal error (not -EPIPE)
  687. erase_op(op);
  688. return 1;
  689. }
  690. else
  691. {
  692. // -EPIPE - clear the error and retry
  693. op->retval = 0;
  694. if (op->needs_reslice)
  695. {
  696. op->parts.clear();
  697. op->done_count = 0;
  698. goto resume_1;
  699. }
  700. else
  701. {
  702. for (int i = 0; i < op->parts.size(); i++)
  703. {
  704. op->parts[i].flags = 0;
  705. }
  706. goto resume_2;
  707. }
  708. }
  709. return 0;
  710. }
  711. static void add_iov(int size, bool skip, cluster_op_t *op, int &iov_idx, size_t &iov_pos, osd_op_buf_list_t &iov, void *scrap, int scrap_len)
  712. {
  713. int left = size;
  714. while (left > 0 && iov_idx < op->iov.count)
  715. {
  716. int cur_left = op->iov.buf[iov_idx].iov_len - iov_pos;
  717. if (cur_left < left)
  718. {
  719. if (!skip)
  720. {
  721. iov.push_back(op->iov.buf[iov_idx].iov_base + iov_pos, cur_left);
  722. }
  723. left -= cur_left;
  724. iov_pos = 0;
  725. iov_idx++;
  726. }
  727. else
  728. {
  729. if (!skip)
  730. {
  731. iov.push_back(op->iov.buf[iov_idx].iov_base + iov_pos, left);
  732. }
  733. iov_pos += left;
  734. left = 0;
  735. }
  736. }
  737. assert(left == 0);
  738. if (skip && scrap_len > 0)
  739. {
  740. // All skipped ranges are read into the same useless buffer
  741. left = size;
  742. while (left > 0)
  743. {
  744. int cur_left = scrap_len < left ? scrap_len : left;
  745. iov.push_back(scrap, cur_left);
  746. left -= cur_left;
  747. }
  748. }
  749. }
  750. void cluster_client_t::slice_rw(cluster_op_t *op)
  751. {
  752. // Slice the request into individual object stripe requests
  753. // Primary OSDs still operate individual stripes, but their size is multiplied by PG minsize in case of EC
  754. auto & pool_cfg = st_cli.pool_config.at(INODE_POOL(op->cur_inode));
  755. uint32_t pg_data_size = (pool_cfg.scheme == POOL_SCHEME_REPLICATED ? 1 : pool_cfg.pg_size-pool_cfg.parity_chunks);
  756. uint64_t pg_block_size = bs_block_size * pg_data_size;
  757. uint64_t first_stripe = (op->offset / pg_block_size) * pg_block_size;
  758. uint64_t last_stripe = ((op->offset + op->len + pg_block_size - 1) / pg_block_size - 1) * pg_block_size;
  759. op->retval = 0;
  760. op->parts.resize((last_stripe - first_stripe) / pg_block_size + 1);
  761. if (op->opcode == OSD_OP_READ)
  762. {
  763. // Allocate memory for the bitmap
  764. unsigned object_bitmap_size = ((op->len / bs_bitmap_granularity + 7) / 8);
  765. object_bitmap_size = (object_bitmap_size < 8 ? 8 : object_bitmap_size);
  766. unsigned bitmap_mem = object_bitmap_size + (bs_bitmap_size * pg_data_size) * op->parts.size();
  767. if (op->bitmap_buf_size < bitmap_mem)
  768. {
  769. op->bitmap_buf = realloc_or_die(op->bitmap_buf, bitmap_mem);
  770. if (!op->bitmap_buf_size)
  771. {
  772. // First allocation
  773. memset(op->bitmap_buf, 0, object_bitmap_size);
  774. }
  775. op->part_bitmaps = op->bitmap_buf + object_bitmap_size;
  776. op->bitmap_buf_size = bitmap_mem;
  777. }
  778. }
  779. int iov_idx = 0;
  780. size_t iov_pos = 0;
  781. int i = 0;
  782. for (uint64_t stripe = first_stripe; stripe <= last_stripe; stripe += pg_block_size)
  783. {
  784. pg_num_t pg_num = (stripe/pool_cfg.pg_stripe_size) % pool_cfg.real_pg_count + 1; // like map_to_pg()
  785. uint64_t begin = (op->offset < stripe ? stripe : op->offset);
  786. uint64_t end = (op->offset + op->len) > (stripe + pg_block_size)
  787. ? (stripe + pg_block_size) : (op->offset + op->len);
  788. op->parts[i].iov.reset();
  789. if (op->cur_inode != op->inode)
  790. {
  791. // Read remaining parts from upper layers
  792. uint64_t prev = begin, cur = begin;
  793. bool skip_prev = true;
  794. while (cur < end)
  795. {
  796. unsigned bmp_loc = (cur - op->offset)/bs_bitmap_granularity;
  797. bool skip = (((*(uint8_t*)(op->bitmap_buf + bmp_loc/8)) >> (bmp_loc%8)) & 0x1);
  798. if (skip_prev != skip)
  799. {
  800. if (cur > prev)
  801. {
  802. if (prev == begin && skip_prev)
  803. {
  804. begin = cur;
  805. // Just advance iov_idx & iov_pos
  806. add_iov(cur-prev, true, op, iov_idx, iov_pos, op->parts[i].iov, NULL, 0);
  807. }
  808. else
  809. add_iov(cur-prev, skip_prev, op, iov_idx, iov_pos, op->parts[i].iov, scrap_buffer, scrap_buffer_size);
  810. }
  811. skip_prev = skip;
  812. prev = cur;
  813. }
  814. cur += bs_bitmap_granularity;
  815. }
  816. assert(cur > prev);
  817. if (skip_prev)
  818. {
  819. // Just advance iov_idx & iov_pos
  820. add_iov(end-prev, true, op, iov_idx, iov_pos, op->parts[i].iov, NULL, 0);
  821. end = prev;
  822. }
  823. else
  824. add_iov(cur-prev, skip_prev, op, iov_idx, iov_pos, op->parts[i].iov, scrap_buffer, scrap_buffer_size);
  825. if (end == begin)
  826. op->done_count++;
  827. }
  828. else
  829. {
  830. add_iov(end-begin, false, op, iov_idx, iov_pos, op->parts[i].iov, NULL, 0);
  831. }
  832. op->parts[i].parent = op;
  833. op->parts[i].offset = begin;
  834. op->parts[i].len = (uint32_t)(end - begin);
  835. op->parts[i].pg_num = pg_num;
  836. op->parts[i].osd_num = 0;
  837. op->parts[i].flags = 0;
  838. i++;
  839. }
  840. }
  841. bool cluster_client_t::affects_osd(uint64_t inode, uint64_t offset, uint64_t len, osd_num_t osd)
  842. {
  843. auto & pool_cfg = st_cli.pool_config.at(INODE_POOL(inode));
  844. uint32_t pg_data_size = (pool_cfg.scheme == POOL_SCHEME_REPLICATED ? 1 : pool_cfg.pg_size-pool_cfg.parity_chunks);
  845. uint64_t pg_block_size = bs_block_size * pg_data_size;
  846. uint64_t first_stripe = (offset / pg_block_size) * pg_block_size;
  847. uint64_t last_stripe = ((offset + len + pg_block_size - 1) / pg_block_size - 1) * pg_block_size;
  848. for (uint64_t stripe = first_stripe; stripe <= last_stripe; stripe += pg_block_size)
  849. {
  850. pg_num_t pg_num = (stripe/pool_cfg.pg_stripe_size) % pool_cfg.real_pg_count + 1; // like map_to_pg()
  851. auto pg_it = pool_cfg.pg_config.find(pg_num);
  852. if (pg_it != pool_cfg.pg_config.end() && pg_it->second.cur_primary == osd)
  853. {
  854. return true;
  855. }
  856. }
  857. return false;
  858. }
  859. bool cluster_client_t::try_send(cluster_op_t *op, int i)
  860. {
  861. auto part = &op->parts[i];
  862. auto & pool_cfg = st_cli.pool_config.at(INODE_POOL(op->cur_inode));
  863. auto pg_it = pool_cfg.pg_config.find(part->pg_num);
  864. if (pg_it != pool_cfg.pg_config.end() &&
  865. !pg_it->second.pause && pg_it->second.cur_primary)
  866. {
  867. osd_num_t primary_osd = pg_it->second.cur_primary;
  868. auto peer_it = msgr.osd_peer_fds.find(primary_osd);
  869. if (peer_it != msgr.osd_peer_fds.end())
  870. {
  871. int peer_fd = peer_it->second;
  872. part->osd_num = primary_osd;
  873. part->flags |= PART_SENT;
  874. op->inflight_count++;
  875. uint64_t pg_bitmap_size = bs_bitmap_size * (
  876. pool_cfg.scheme == POOL_SCHEME_REPLICATED ? 1 : pool_cfg.pg_size-pool_cfg.parity_chunks
  877. );
  878. uint64_t meta_rev = 0;
  879. auto ino_it = st_cli.inode_config.find(op->inode);
  880. if (ino_it != st_cli.inode_config.end())
  881. meta_rev = ino_it->second.mod_revision;
  882. part->op = (osd_op_t){
  883. .op_type = OSD_OP_OUT,
  884. .peer_fd = peer_fd,
  885. .req = { .rw = {
  886. .header = {
  887. .magic = SECONDARY_OSD_OP_MAGIC,
  888. .id = op_id++,
  889. .opcode = op->opcode,
  890. },
  891. .inode = op->cur_inode,
  892. .offset = part->offset,
  893. .len = part->len,
  894. .meta_revision = meta_rev,
  895. } },
  896. .bitmap = op->opcode == OSD_OP_WRITE ? NULL : op->part_bitmaps + pg_bitmap_size*i,
  897. .bitmap_len = (unsigned)(op->opcode == OSD_OP_WRITE ? 0 : pg_bitmap_size),
  898. .callback = [this, part](osd_op_t *op_part)
  899. {
  900. handle_op_part(part);
  901. },
  902. };
  903. part->op.iov = part->iov;
  904. msgr.outbox_push(&part->op);
  905. return true;
  906. }
  907. else if (msgr.wanted_peers.find(primary_osd) == msgr.wanted_peers.end())
  908. {
  909. msgr.connect_peer(primary_osd, st_cli.peer_states[primary_osd]);
  910. }
  911. }
  912. return false;
  913. }
  914. int cluster_client_t::continue_sync(cluster_op_t *op)
  915. {
  916. if (op->state == 1)
  917. goto resume_1;
  918. if (immediate_commit || !dirty_osds.size())
  919. {
  920. // Sync is not required in the immediate_commit mode or if there are no dirty_osds
  921. op->retval = 0;
  922. erase_op(op);
  923. return 1;
  924. }
  925. // Check that all OSD connections are still alive
  926. for (auto do_it = dirty_osds.begin(); do_it != dirty_osds.end(); )
  927. {
  928. osd_num_t sync_osd = *do_it;
  929. auto peer_it = msgr.osd_peer_fds.find(sync_osd);
  930. if (peer_it == msgr.osd_peer_fds.end())
  931. dirty_osds.erase(do_it++);
  932. else
  933. do_it++;
  934. }
  935. // Post sync to affected OSDs
  936. for (auto & prev_op: dirty_buffers)
  937. {
  938. if (prev_op.second.state == CACHE_DIRTY)
  939. {
  940. prev_op.second.state = CACHE_FLUSHING;
  941. }
  942. }
  943. op->parts.resize(dirty_osds.size());
  944. op->retval = 0;
  945. {
  946. int i = 0;
  947. for (auto sync_osd: dirty_osds)
  948. {
  949. op->parts[i] = {
  950. .parent = op,
  951. .osd_num = sync_osd,
  952. .flags = 0,
  953. };
  954. send_sync(op, &op->parts[i]);
  955. i++;
  956. }
  957. }
  958. dirty_osds.clear();
  959. resume_1:
  960. if (op->inflight_count > 0)
  961. {
  962. op->state = 1;
  963. return 0;
  964. }
  965. if (op->retval != 0)
  966. {
  967. for (auto uw_it = dirty_buffers.begin(); uw_it != dirty_buffers.end(); uw_it++)
  968. {
  969. if (uw_it->second.state == CACHE_FLUSHING)
  970. {
  971. uw_it->second.state = CACHE_DIRTY;
  972. }
  973. }
  974. if (op->retval == -EPIPE)
  975. {
  976. // Retry later
  977. op->parts.clear();
  978. op->retval = 0;
  979. op->inflight_count = 0;
  980. op->done_count = 0;
  981. op->state = 0;
  982. return 0;
  983. }
  984. }
  985. else
  986. {
  987. for (auto uw_it = dirty_buffers.begin(); uw_it != dirty_buffers.end(); )
  988. {
  989. if (uw_it->second.state == CACHE_FLUSHING)
  990. {
  991. free(uw_it->second.buf);
  992. dirty_buffers.erase(uw_it++);
  993. }
  994. else
  995. uw_it++;
  996. }
  997. }
  998. erase_op(op);
  999. return 1;
  1000. }
  1001. void cluster_client_t::send_sync(cluster_op_t *op, cluster_op_part_t *part)
  1002. {
  1003. auto peer_it = msgr.osd_peer_fds.find(part->osd_num);
  1004. assert(peer_it != msgr.osd_peer_fds.end());
  1005. part->flags |= PART_SENT;
  1006. op->inflight_count++;
  1007. part->op = (osd_op_t){
  1008. .op_type = OSD_OP_OUT,
  1009. .peer_fd = peer_it->second,
  1010. .req = {
  1011. .hdr = {
  1012. .magic = SECONDARY_OSD_OP_MAGIC,
  1013. .id = op_id++,
  1014. .opcode = OSD_OP_SYNC,
  1015. },
  1016. },
  1017. .callback = [this, part](osd_op_t *op_part)
  1018. {
  1019. handle_op_part(part);
  1020. },
  1021. };
  1022. msgr.outbox_push(&part->op);
  1023. }
  1024. static inline void mem_or(void *res, const void *r2, unsigned int len)
  1025. {
  1026. unsigned int i;
  1027. for (i = 0; i < len; ++i)
  1028. {
  1029. // Hope the compiler vectorizes this
  1030. ((uint8_t*)res)[i] = ((uint8_t*)res)[i] | ((uint8_t*)r2)[i];
  1031. }
  1032. }
  1033. void cluster_client_t::handle_op_part(cluster_op_part_t *part)
  1034. {
  1035. cluster_op_t *op = part->parent;
  1036. op->inflight_count--;
  1037. int expected = part->op.req.hdr.opcode == OSD_OP_SYNC ? 0 : part->op.req.rw.len;
  1038. if (part->op.reply.hdr.retval != expected)
  1039. {
  1040. // Operation failed, retry
  1041. fprintf(
  1042. stderr, "%s operation failed on OSD %lu: retval=%ld (expected %d), dropping connection\n",
  1043. osd_op_names[part->op.req.hdr.opcode], part->osd_num, part->op.reply.hdr.retval, expected
  1044. );
  1045. if (part->op.reply.hdr.retval == -EPIPE)
  1046. {
  1047. // Mark op->up_wait = true before stopping the client
  1048. op->up_wait = true;
  1049. if (!retry_timeout_id)
  1050. {
  1051. retry_timeout_id = tfd->set_timer(up_wait_retry_interval, false, [this](int)
  1052. {
  1053. retry_timeout_id = 0;
  1054. continue_ops(true);
  1055. });
  1056. }
  1057. }
  1058. if (!op->retval || op->retval == -EPIPE)
  1059. {
  1060. // Don't overwrite other errors with -EPIPE
  1061. op->retval = part->op.reply.hdr.retval;
  1062. }
  1063. msgr.stop_client(part->op.peer_fd);
  1064. part->flags |= PART_ERROR;
  1065. }
  1066. else
  1067. {
  1068. // OK
  1069. dirty_osds.insert(part->osd_num);
  1070. part->flags |= PART_DONE;
  1071. op->done_count++;
  1072. if (op->opcode == OSD_OP_READ)
  1073. {
  1074. copy_part_bitmap(op, part);
  1075. }
  1076. }
  1077. if (op->inflight_count == 0)
  1078. {
  1079. if (op->opcode == OSD_OP_SYNC)
  1080. continue_sync(op);
  1081. else
  1082. continue_rw(op);
  1083. }
  1084. }
  1085. void cluster_client_t::copy_part_bitmap(cluster_op_t *op, cluster_op_part_t *part)
  1086. {
  1087. // Copy (OR) bitmap
  1088. auto & pool_cfg = st_cli.pool_config.at(INODE_POOL(op->cur_inode));
  1089. uint32_t pg_block_size = bs_block_size * (
  1090. pool_cfg.scheme == POOL_SCHEME_REPLICATED ? 1 : pool_cfg.pg_size-pool_cfg.parity_chunks
  1091. );
  1092. uint32_t object_offset = (part->op.req.rw.offset - op->offset) / bs_bitmap_granularity;
  1093. uint32_t part_offset = (part->op.req.rw.offset % pg_block_size) / bs_bitmap_granularity;
  1094. uint32_t part_len = part->op.req.rw.len / bs_bitmap_granularity;
  1095. if (!(object_offset & 0x7) && !(part_offset & 0x7) && (part_len >= 8))
  1096. {
  1097. // Copy bytes
  1098. mem_or(op->bitmap_buf + object_offset/8, part->op.bitmap + part_offset/8, part_len/8);
  1099. object_offset += (part_len & ~0x7);
  1100. part_offset += (part_len & ~0x7);
  1101. part_len = (part_len & 0x7);
  1102. }
  1103. while (part_len > 0)
  1104. {
  1105. // Copy bits
  1106. (*(uint8_t*)(op->bitmap_buf + (object_offset >> 3))) |= (
  1107. (((*(uint8_t*)(part->op.bitmap + (part_offset >> 3))) >> (part_offset & 0x7)) & 0x1) << (object_offset & 0x7)
  1108. );
  1109. part_offset++;
  1110. object_offset++;
  1111. part_len--;
  1112. }
  1113. }